drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [21/27] git commit: DRILL-939: Add support for query cancellation
Date Sun, 27 Jul 2014 18:47:07 GMT
DRILL-939: Add support for query cancellation


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5e482c17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5e482c17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5e482c17

Branch: refs/heads/master
Commit: 5e482c17d20bcc957be50d570d03f1a5fdfca75e
Parents: 1e9930f
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Tue Jul 22 18:11:44 2014 -0700
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Fri Jul 25 18:33:49 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   |   6 +-
 .../exec/client/PrintingResultsListener.java    |   6 +
 .../apache/drill/exec/ops/FragmentContext.java  |   9 ++
 .../exec/physical/impl/SingleSenderCreator.java |   4 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |  12 ++
 .../PartitionSenderRootExec.java                |   2 +-
 .../impl/producer/ProducerConsumerBatch.java    |  44 ++++---
 .../UnorderedReceiverBatch.java                 |   3 +
 .../drill/exec/proto/helper/QueryIdHelper.java  |   5 +
 .../drill/exec/record/AbstractRecordBatch.java  |   4 +
 .../drill/exec/rpc/control/ControlTunnel.java   |  13 +-
 .../drill/exec/rpc/control/WorkEventBus.java    |  19 ++-
 .../apache/drill/exec/rpc/data/DataServer.java  |   5 +
 .../drill/exec/rpc/user/UserRpcConfig.java      |   1 +
 .../apache/drill/exec/rpc/user/UserServer.java  |  20 +++-
 .../exec/server/rest/ProfileResources.java      |  17 +++
 .../drill/exec/server/rest/ProfileWrapper.java  |   7 ++
 .../org/apache/drill/exec/work/WorkManager.java |  12 +-
 .../exec/work/batch/ControlHandlerImpl.java     |   2 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |   4 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   2 +-
 .../drill/exec/work/foreman/QueryManager.java   |  59 ++++++---
 .../drill/exec/work/foreman/QueryStatus.java    |  10 ++
 .../exec/work/fragment/FragmentExecutor.java    | 119 ++++++++++---------
 .../work/fragment/NonRootFragmentManager.java   |   8 +-
 .../src/main/resources/rest/profile/profile.ftl |   3 +
 26 files changed, 284 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 6690bf5..36c162a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
 import org.apache.drill.exec.rpc.ChannelClosedException;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
@@ -239,8 +240,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     return listener.getResults();
   }
 
-  public void cancelQuery(QueryId id){
-    client.send(RpcType.CANCEL_QUERY, id, Ack.class);
+  public DrillRpcFuture<Ack> cancelQuery(QueryId id){
+    logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+    return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index a5b65ec..3302e7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -41,6 +41,7 @@ public class PrintingResultsListener implements UserResultsListener {
   int    columnWidth;
   BufferAllocator allocator;
   volatile Exception exception;
+  QueryId queryId;
 
   public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) {
     this.allocator = new TopLevelAllocator(config);
@@ -97,7 +98,12 @@ public class PrintingResultsListener implements UserResultsListener {
     return count.get();
   }
 
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
   @Override
   public void queryIdArrived(QueryId queryId) {
+    this.queryId = queryId;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index c426918..3401bc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -75,6 +75,7 @@ public class FragmentContext implements Closeable {
 
   private volatile Throwable failureCause;
   private volatile boolean failed = false;
+  private volatile boolean cancelled = false;
 
   public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
       FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
@@ -117,6 +118,10 @@ public class FragmentContext implements Closeable {
     failureCause = cause;
   }
 
+  public void cancel() {
+    cancelled = true;
+  }
+
   public DrillbitContext getDrillbitContext() {
     return context;
   }
@@ -227,6 +232,10 @@ public class FragmentContext implements Closeable {
     return failed;
   }
 
+  public boolean isCancelled() {
+    return cancelled;
+  }
+
   public FunctionImplementationRegistry getFunctionRegistry() {
     return funcRegistry;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 396f7a2..325e315 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -139,7 +139,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       @Override
       public void failed(RpcException ex) {
         sendCount.decrement();
-        context.fail(ex);
+        if (!context.isCancelled() && !context.isFailed()) {
+          context.fail(ex);
+        }
         stop();
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ee957d9..313fdec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -169,6 +169,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         RawFragmentBatch rawBatch = null;
         try {
           rawBatch = getNext(provider);
+          if (rawBatch == null && context.isCancelled()) {
+            return IterOutcome.STOP;
+          }
         } catch (IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
@@ -181,6 +184,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           }
           try {
             while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0);
+            if (rawBatch == null && context.isCancelled()) {
+              return IterOutcome.STOP;
+            }
           } catch (IOException e) {
             context.fail(e);
             return IterOutcome.STOP;
@@ -300,6 +306,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             } else {
               batchLoaders[b].clear();
               batchLoaders[b] = null;
+              if (context.isCancelled()) {
+                return IterOutcome.STOP;
+              }
             }
           } catch (IOException | SchemaChangeException e) {
             context.fail(e);
@@ -340,6 +349,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
             nextBatch = getNext(fragProviders[node.batchId]);
           }
+          if (nextBatch == null && context.isCancelled()) {
+            return IterOutcome.STOP;
+          }
         } catch (IOException e) {
           context.fail(e);
           return IterOutcome.STOP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 659863f..69be256 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -154,8 +154,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
         try {
           partitioner.partitionBatch(incoming);
         } catch (IOException e) {
-          incoming.kill();
           context.fail(e);
+          incoming.kill();
           return false;
         }
         for (VectorWrapper<?> v : incoming) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 120a611..2dae502 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -71,7 +71,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       wrapper = queue.take();
       logger.debug("Got batch from queue");
     } catch (InterruptedException e) {
-      context.fail(e);
+      if (!(context.isCancelled() || context.isFailed())) {
+        context.fail(e);
+      }
       return IterOutcome.STOP;
     } finally {
       stats.stopWait();
@@ -117,8 +119,11 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
 
   private class Producer implements Runnable {
 
+    RecordBatchDataWrapper wrapper;
+
     @Override
     public void run() {
+      try {
       if (stop) return;
       outer: while (true) {
         IterOutcome upstream = incoming.next();
@@ -135,14 +140,17 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
           case OK_NEW_SCHEMA:
           case OK:
             try {
-              if (!stop) queue.put(new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false));
+              if (!stop) {
+                wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false);
+                queue.put(wrapper);
+              }
             } catch (InterruptedException e) {
-              context.fail(e);
-              try {
-                queue.putFirst(new RecordBatchDataWrapper(null, false, true));
-              } catch (InterruptedException e1) {
-                throw new RuntimeException(e1);
+              if (!(context.isCancelled() || context.isFailed())) {
+                context.fail(e);
               }
+              wrapper.batch.getContainer().zeroVectors();
+              incoming.cleanup();
+              break outer;
             }
             break;
           default:
@@ -152,14 +160,15 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       try {
         queue.put(new RecordBatchDataWrapper(null, true, false));
       } catch (InterruptedException e) {
-        context.fail(e);
-        try {
-          queue.putFirst(new RecordBatchDataWrapper(null, false, true));
-        } catch (InterruptedException e1) {
-          throw new RuntimeException(e1);
+        if (!(context.isCancelled() || context.isFailed())) {
+          context.fail(e);
         }
+
+      }
+      } finally {
+        incoming.cleanup();
+        logger.debug("Producer thread finished");
       }
-      logger.debug("Producer thread finished");
     }
   }
 
@@ -174,7 +183,13 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
 
   @Override
   protected void killIncoming() {
-    incoming.kill();
+    producer.interrupt();
+    stop = true;
+    try {
+      producer.join();
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted while waiting for producer thread");
+    }
   }
 
   @Override
@@ -182,7 +197,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
     stop = true;
     clearQueue();
     super.cleanup();
-    incoming.cleanup();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 7424870..79669fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -138,6 +138,9 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
       if (batch == null) {
         batchLoader.clear();
+        if (context.isCancelled()) {
+          return IterOutcome.STOP;
+        }
         return IterOutcome.NONE;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index 3bbe231..bb12a22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -31,6 +31,11 @@ public class QueryIdHelper {
     return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
   }
 
+  public static QueryId getQueryIdFromString(String queryId) {
+    UUID uuid = UUID.fromString(queryId);
+    return QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
+  }
+
   public static String getQueryIdentifier(FragmentHandle h) {
     return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 4c1f82d..088b120 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -69,6 +69,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     IterOutcome next = null;
     stats.stopProcessing();
     try{
+      if (context.isCancelled()) {
+        return IterOutcome.STOP;
+      }
       next = b.next();
     }finally{
       stats.startProcessing();
@@ -82,6 +85,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
       stats.batchReceived(inputIndex, b.getRecordCount(), false);
       break;
     }
+
     return next;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 432acab..9a26039 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -51,10 +51,9 @@ public class ControlTunnel {
     manager.runCommand(b);
   }
 
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
-    CancelFragment b = new CancelFragment(handle);
+  public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
+    CancelFragment b = new CancelFragment(outcomeListener, handle);
     manager.runCommand(b);
-    return b.getFuture();
   }
 
   public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
@@ -85,17 +84,17 @@ public class ControlTunnel {
 
   }
 
-  public static class CancelFragment extends FutureBitCommand<Ack, ControlConnection> {
+  public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
     final FragmentHandle handle;
 
-    public CancelFragment(FragmentHandle handle) {
-      super();
+    public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) {
+      super(listener);
       this.handle = handle;
     }
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle,  Ack.class);
+      connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 28050eb..cbfa1f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -20,7 +20,11 @@ package org.apache.drill.exec.rpc.control;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
 import org.apache.drill.exec.cache.DistributedMap;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -44,6 +48,10 @@ public class WorkEventBus {
   private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(
       16, 0.75f, 16);
   private final WorkerBee bee;
+  private final Cache<FragmentHandle,Void> cancelledFragments = CacheBuilder.newBuilder()
+          .maximumSize(10000)
+          .expireAfterWrite(10, TimeUnit.MINUTES)
+          .build();
 
   public WorkEventBus(WorkerBee bee) {
     this.bee = bee;
@@ -85,7 +93,16 @@ public class WorkEventBus {
     return managers.get(handle);
   }
 
+  public void cancelFragment(FragmentHandle handle) {
+    cancelledFragments.put(handle, null);
+    removeFragmentManager(handle);
+  }
+
   public FragmentManager getOrCreateFragmentManager(FragmentHandle handle) throws FragmentSetupException{
+    if (cancelledFragments.asMap().containsKey(handle)) {
+      logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+      return null;
+    }
     FragmentManager manager = managers.get(handle);
     if (manager != null) return manager;
     DistributedMap<FragmentHandle, PlanFragment> planCache = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE);
@@ -99,7 +116,7 @@ public class WorkEventBus {
       throw new FragmentSetupException("Received batch where fragment was not in cache.");
     }
 
-    FragmentManager newManager = new NonRootFragmentManager(fragment, bee.getContext());
+    FragmentManager newManager = new NonRootFragmentManager(fragment, bee);
 
     // since their could be a race condition on the check, we'll use putIfAbsent so we don't have two competing
     // handlers.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index a449032..42dee94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -107,6 +107,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
 
     try {
       FragmentManager manager = workBus.getOrCreateFragmentManager(fragmentBatch.getHandle());
+      if (manager == null) {
+        if (body != null) {
+          body.release();
+        }
+      }
       BufferAllocator allocator = manager.getFragmentContext().getAllocator();
       if(body != null){
         if(!allocator.takeOwnership((AccountingByteBuf) body.unwrap())){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 0c04fec..be5bb8b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -32,6 +32,7 @@ public class UserRpcConfig {
   public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
       .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
       .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
+      .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
       .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
       .build();
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index aaf3c2d..9b5e830 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -74,7 +75,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
     switch (rpcType) {
 
     case RpcType.RUN_QUERY_VALUE:
-      logger.trace("Received query to run.  Returning query handle.");
+      logger.debug("Received query to run.  Returning query handle.");
       try {
         RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
         return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
@@ -83,7 +84,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       }
 
     case RpcType.REQUEST_RESULTS_VALUE:
-      logger.trace("Received results requests.  Returning empty query result.");
+      logger.debug("Received results requests.  Returning empty query result.");
       try {
         RequestResults req = RequestResults.PARSER.parseFrom(new ByteBufInputStream(pBody));
         return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, req));
@@ -92,8 +93,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       }
 
     case RpcType.CANCEL_QUERY_VALUE:
-      logger.warn("Cancel requested but not supported yet.");
-      return new Response(RpcType.ACK, Acks.OK);
+      try {
+        QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        return new Response(RpcType.ACK, worker.cancelQuery(queryId));
+      } catch (InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding QueryId body.", e);
+      }
 
     default:
       throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
@@ -125,7 +130,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
     public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
       logger.trace("Sending result to client with {}", result);
-      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, false, result.getBuffers());
+    }
+
+    public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
+      logger.trace("Sending result to client with {}", result);
+      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
index 5ae4267..9cbc2e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
@@ -35,6 +35,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.QueryStatus;
@@ -169,4 +170,20 @@ public class ProfileResources {
     return new Viewable("/rest/profile/profile.ftl", wrapper);
 
   }
+
+  @GET
+  @Path("/profiles/cancel/{queryid}")
+  @Produces(MediaType.TEXT_PLAIN)
+  public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
+    PStore<QueryProfile> profiles = work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
+    QueryProfile profile = profiles.get(queryId);
+    if (profile != null && (profile.getState() == QueryState.RUNNING || profile.getState() == QueryState.PENDING)) {
+      work.getUserWorker().cancelQuery(QueryIdHelper.getQueryIdFromString(queryId));
+      return "Cancelled query " + queryId;
+    }
+    if (profile == null) {
+      return "No such query: " + queryId;
+    }
+    return "Query " + queryId + " not running";
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
index f92e3c5..706f9a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 import java.text.DateFormat;
 import java.text.NumberFormat;
@@ -44,15 +45,21 @@ import java.util.TreeMap;
 
 public class ProfileWrapper {
   public QueryProfile profile;
+  public String id;
 
   public ProfileWrapper(QueryProfile profile) {
     this.profile = profile;
+    this.id = QueryIdHelper.getQueryId(profile.getId());
   }
 
   public QueryProfile getProfile() {
     return profile;
   }
 
+  public String getId() {
+    return id;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 7d5d37f..0407361 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -187,6 +187,10 @@ public class WorkManager implements Closeable {
       return runningFragments.get(handle);
     }
 
+    public void removeFragment(FragmentHandle handle) {
+      runningFragments.remove(handle);
+    }
+
     public Foreman getForemanForQueryId(QueryId queryId) {
       return queries.get(queryId);
     }
@@ -212,9 +216,13 @@ public class WorkManager implements Closeable {
       try {
         while (true) {
           // logger.debug("Polling for pending work tasks.");
-          Runnable r = pendingTasks.take();
+          RunnableWrapper r = pendingTasks.take();
           if (r != null) {
             logger.debug("Starting pending task {}", r);
+            if (r.inner instanceof FragmentExecutor) {
+              FragmentExecutor fragmentExecutor = (FragmentExecutor) r.inner;
+              runningFragments.put(fragmentExecutor.getContext().getHandle(), fragmentExecutor);
+            }
             executor.execute(r);
           }
 
@@ -228,7 +236,7 @@ public class WorkManager implements Closeable {
 
   private class RunnableWrapper implements Runnable {
 
-    private final Runnable inner;
+    final Runnable inner;
     private final String id;
 
     public RunnableWrapper(Runnable r, String id){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index ee51f3b..afd3fa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -125,7 +125,7 @@ public class ControlHandlerImpl implements ControlMessageHandler {
     NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
     try{
       FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-      FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+      FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener);
       bee.addFragmentRunner(fr);
     } catch (Exception e) {
       listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 41d70a5..bb56e10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -76,14 +76,14 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   @Override
   public void cleanup() {
-    if (!finished) {
+    if (!finished && !context.isCancelled()) {
       IllegalStateException e = new IllegalStateException("Cleanup before finished");
       context.fail(e);
       throw e;
     }
 
     if (!buffer.isEmpty()) {
-      if (!context.isFailed()) {
+      if (!context.isFailed() && !context.isCancelled()) {
         context.fail(new IllegalStateException("Batches still in queue during cleanup"));
         logger.error("{} Batches in queue.", buffer.size());
         RawFragmentBatch batch;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 4cc3e63..b1ed8a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -185,7 +185,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   void cleanupAndSendResult(QueryResult result){
     bee.retireForeman(this);
-    initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result));
+    initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result), true);
     state.updateState(QueryState.RUNNING, QueryState.COMPLETED);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index ed1a428..4e1ca22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.work.foreman;
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -37,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -63,6 +66,9 @@ public class QueryManager implements FragmentStatusListener{
   private QueryId queryId;
   private FragmentExecutor rootRunner;
   private RunQuery query;
+  private volatile boolean running = false;
+  private volatile boolean cancelled = false;
+  private volatile boolean stopped = false;
 
   public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, ForemanManagerListener foremanManagerListener, Controller controller, Foreman foreman) {
     super();
@@ -101,7 +107,7 @@ public class QueryManager implements FragmentStatusListener{
       // add fragment to local node.
       status.add(new FragmentData(rootFragment.getHandle(), null, true));
       logger.debug("Fragment added to local node.");
-      rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment));
+      rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, new RootStatusHandler(rootContext, rootFragment));
       RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 
       if(buffers.isDone()){
@@ -127,6 +133,10 @@ public class QueryManager implements FragmentStatusListener{
     }
 
     logger.debug("Fragment runs setup is complete.");
+    running = true;
+    if (cancelled && !stopped) {
+      stopQuery();
+    }
   }
 
   private void sendRemoteFragment(PlanFragment fragment){
@@ -193,26 +203,37 @@ public class QueryManager implements FragmentStatusListener{
   private void stopQuery(){
     workBus.removeFragmentStatusListener(queryId);
     // Stop all queries with a currently active status.
-//    for(FragmentData data: map.values()){
-//      FragmentHandle handle = data.getStatus().getHandle();
-//      switch(data.getStatus().getState()){
-//      case SENDING:
-//      case AWAITING_ALLOCATION:
-//      case RUNNING:
-//        if(data.isLocal()){
-//          rootRunner.cancel();
-//        }else{
-//          tun.get(data.getEndpoint()).cancelFragment(handle).addLightListener(new CancelListener(data.endpoint, handle));
-//        }
-//        break;
-//      default:
-//        break;
-//      }
-//    }
+    List<FragmentData> fragments = status.getFragmentData();
+    Collections.sort(fragments, new Comparator<FragmentData>() {
+      @Override
+      public int compare(FragmentData o1, FragmentData o2) {
+        return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
+      }
+    });
+    for(FragmentData data: fragments){
+      FragmentHandle handle = data.getStatus().getHandle();
+      switch(data.getStatus().getProfile().getState()){
+      case SENDING:
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+        if(data.isLocal()){
+          rootRunner.cancel();
+        }else{
+          controller.getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
+        }
+        break;
+      default:
+        break;
+      }
+    }
   }
 
   public void cancel(){
-    stopQuery();
+    cancelled = true;
+    if (running) {
+      stopQuery();
+      stopped = true;
+    }
   }
 
   private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
@@ -234,7 +255,7 @@ public class QueryManager implements FragmentStatusListener{
       // do nothing.
     }
 
-  };
+  }
 
   public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){
     return new FragmentSubmitListener(endpoint, value);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
index 70de958..62293fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.foreman;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.DistributedCache.CacheConfig;
@@ -38,6 +39,9 @@ import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 public class QueryStatus {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
@@ -48,6 +52,7 @@ public class QueryStatus {
 
   // doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then accessed by multiple threads for reads only.
   private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>();
+  private List<FragmentData> fragmentDataSet = Lists.newArrayList();
 
   private final String queryId;
   private final QueryId id;
@@ -73,6 +78,10 @@ public class QueryStatus {
     this.foreman = foreman;
   }
 
+  public List<FragmentData> getFragmentData() {
+    return fragmentDataSet;
+  }
+
   public void setPlanText(String planText){
     this.planText = planText;
     updateCache();
@@ -106,6 +115,7 @@ public class QueryStatus {
     }
 
     minorMap.put(minorFragmentId, data);
+    fragmentDataSet.add(data);
   }
 
   void update(FragmentStatus status, boolean updateCache){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 51421a7..735e663 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.work.CancelableQuery;
 import org.apache.drill.exec.work.StatusProvider;
 
 import com.codahale.metrics.Timer;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
 
 /**
  * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
@@ -44,10 +45,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   private final FragmentRoot rootOperator;
   private RootExec root;
   private final FragmentContext context;
+  private final WorkerBee bee;
   private final StatusReporter listener;
+  private Thread executionThread;
 
-  public FragmentExecutor(FragmentContext context, FragmentRoot rootOperator, StatusReporter listener){
+  public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){
     this.context = context;
+    this.bee = bee;
     this.rootOperator = rootOperator;
     this.listener = listener;
   }
@@ -60,6 +64,11 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   @Override
   public void cancel() {
     updateState(FragmentState.CANCELLED);
+    logger.debug("Cancelled Fragment {}", context.getHandle());
+    context.cancel();
+    if (executionThread != null) {
+      executionThread.interrupt();
+    }
   }
 
   public UserClientConnection getClient(){
@@ -68,71 +77,75 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
   @Override
   public void run() {
-    final String originalThread = Thread.currentThread().getName();
-    String newThreadName = String.format("%s:frag:%s:%s", //
-        QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
-        context.getHandle().getMajorFragmentId(),
-        context.getHandle().getMinorFragmentId()
-        );
-    Thread.currentThread().setName(newThreadName);
-
-    boolean closed = false;
     try {
-      root = ImplCreator.getExec(context, rootOperator);
-    } catch (AssertionError | Exception e) {
-      context.fail(e);
-      logger.debug("Failure while initializing operator tree", e);
-      internalFail(e);
-      return;
-    }
+      final String originalThread = Thread.currentThread().getName();
+      String newThreadName = String.format("%s:frag:%s:%s", //
+          QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
+          context.getHandle().getMajorFragmentId(),
+          context.getHandle().getMinorFragmentId()
+          );
+      Thread.currentThread().setName(newThreadName);
+      executionThread = Thread.currentThread();
+
+      boolean closed = false;
+      try {
+        root = ImplCreator.getExec(context, rootOperator);
+      } catch (AssertionError | Exception e) {
+        context.fail(e);
+        logger.debug("Failure while initializing operator tree", e);
+        internalFail(e);
+        return;
+      }
 
-    logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
-    if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
-      internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
-      return;
-    }
+      logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
+      if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
+        internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+        return;
+      }
 
 
 
-    // run the query until root.next returns false.
-    try{
-      while(state.get() == FragmentState.RUNNING_VALUE){
-        if(!root.next()){
-          if(context.isFailed()){
-            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
-          }else{
-            updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
-          }
+      // run the query until root.next returns false.
+      try{
+        while(state.get() == FragmentState.RUNNING_VALUE){
+          if(!root.next()){
+            if(context.isFailed()){
+              updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+            }else{
+              updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+            }
 
+          }
         }
-      }
 
-      root.stop();
-      if(context.isFailed()) {
-        internalFail(context.getFailureCause());
-      }
+        root.stop();
+        if(context.isFailed()) {
+          internalFail(context.getFailureCause());
+        }
 
-      closed = true;
-
-      context.close();
-    }catch(AssertionError | Exception ex){
-      logger.debug("Caught exception while running fragment", ex);
-      internalFail(ex);
-    }finally{
-      Thread.currentThread().setName(originalThread);
-      if(!closed) {
-        try {
-          root.stop();
-          if(context.isFailed()) {
-            internalFail(context.getFailureCause());
+        closed = true;
+
+        context.close();
+      }catch(AssertionError | Exception ex){
+        logger.debug("Caught exception while running fragment", ex);
+        internalFail(ex);
+      }finally{
+        Thread.currentThread().setName(originalThread);
+        if(!closed) {
+          try {
+            if(context.isFailed()) {
+              internalFail(context.getFailureCause());
+            }
+            context.close();
+          } catch (RuntimeException e) {
+            logger.warn("Failure while closing context in failed state.", e);
           }
-          context.close();
-        } catch (RuntimeException e) {
-          logger.warn("Failure while closing context in failed state.", e);
         }
       }
+    } finally {
+      logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
+      bee.removeFragment(context.getHandle());
     }
-    logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
   }
 
   private void internalFail(Throwable excep){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 51bf81c..48d1466 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 /**
@@ -48,12 +49,15 @@ public class NonRootFragmentManager implements FragmentManager {
   private final StatusReporter runnerListener;
   private volatile FragmentExecutor runner;
   private volatile boolean cancel = false;
+  private final WorkerBee bee;
   private final FragmentContext context;
   private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
 
-  public NonRootFragmentManager(PlanFragment fragment, DrillbitContext context) throws FragmentSetupException{
+  public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws FragmentSetupException{
     try{
       this.fragment = fragment;
+      DrillbitContext context = bee.getContext();
+      this.bee = bee;
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
       this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
@@ -81,7 +85,7 @@ public class NonRootFragmentManager implements FragmentManager {
     synchronized(this){
       if(runner != null) throw new IllegalStateException("Get Runnable can only be run once.");
       if(cancel) return null;
-      runner = new FragmentExecutor(context, root, runnerListener);
+      runner = new FragmentExecutor(context, bee, root, runnerListener);
       return this.runner;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5e482c17/exec/java-exec/src/main/resources/rest/profile/profile.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index 8abb316..ffe7e46 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -49,6 +49,9 @@
     </div>
     <button type="submit" class="btn btn-default">Re-run query</button>
   </form>
+  <form action="/profiles/cancel/${model.id}" method="GET">
+    <button type="link" class="btn btn-default">Cancel query</button>
+  </form>
   <div class="page-header">
   </div>
   <h3>Visualized Plan</h3>


Mime
View raw message