drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [23/27] git commit: DRILL-991: Limit should terminate upstream fragments immediately upon completion
Date Sun, 27 Jul 2014 18:47:09 GMT
DRILL-991: Limit should terminate upstream fragments immediately upon completion


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c331aed8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c331aed8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c331aed8

Branch: refs/heads/master
Commit: c331aed81e73d16ea29bf8c94863591b212aa644
Parents: 5e482c1
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jul 23 20:03:07 2014 -0700
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Fri Jul 25 18:33:53 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/BaseRootExec.java  |   6 +
 .../drill/exec/physical/impl/RootExec.java      |   8 +
 .../drill/exec/physical/impl/ScanBatch.java     |   8 +-
 .../exec/physical/impl/SingleSenderCreator.java |  20 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |  13 +-
 .../exec/physical/impl/WriterRecordBatch.java   |   6 +-
 .../physical/impl/aggregate/HashAggBatch.java   |   6 +-
 .../impl/aggregate/StreamingAggBatch.java       |   6 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |   9 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |  10 +-
 .../physical/impl/limit/LimitRecordBatch.java   |   4 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |  52 +-
 .../OrderedPartitionRecordBatch.java            |  12 +-
 .../partitionsender/PartitionOutgoingBatch.java |  26 +
 .../PartitionSenderRootExec.java                |  57 +-
 .../partitionsender/PartitionStatsBatch.java    |  24 -
 .../impl/partitionsender/Partitioner.java       |   3 +-
 .../partitionsender/PartitionerTemplate.java    |  27 +-
 .../impl/producer/ProducerConsumerBatch.java    |   2 +-
 .../exec/physical/impl/sort/SortBatch.java      |  11 +-
 .../impl/union/UnionAllRecordBatch.java         |  10 +-
 .../UnorderedReceiverBatch.java                 |  47 +-
 .../IteratorValidatorBatchIterator.java         |   4 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |  10 +-
 .../exec/planner/physical/PlannerSettings.java  |   2 +-
 .../drill/exec/record/AbstractRecordBatch.java  |   6 +-
 .../exec/record/AbstractSingleRecordBatch.java  |   6 +-
 .../apache/drill/exec/record/RecordBatch.java   |   2 +-
 .../exec/rpc/control/ControlRpcConfig.java      |   2 +
 .../drill/exec/rpc/control/ControlTunnel.java   |  22 +-
 .../exec/work/batch/ControlHandlerImpl.java     |  22 +
 .../exec/work/fragment/FragmentExecutor.java    |   5 +
 .../exec/physical/impl/SimpleRootExec.java      |   6 +
 .../org/apache/drill/exec/proto/BitControl.java | 810 ++++++++++++++++++-
 .../drill/exec/proto/SchemaBitControl.java      | 122 +++
 .../exec/proto/beans/FinishedReceiver.java      | 189 +++++
 protocol/src/main/protobuf/BitControl.proto     |  20 +-
 37 files changed, 1418 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index fa6c997..c2c3144 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
@@ -72,4 +73,9 @@ public abstract class BaseRootExec implements RootExec {
   }
 
   public abstract boolean innerNext();
+
+  @Override
+  public void receivingFragmentFinished(FragmentHandle handle) {
+    logger.warn("Currently not handling FinishedFragment message");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index fcc10aa..42ac4f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.physical.impl;
 
 
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
 /**
  * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
  * output nodes and storage nodes.  They are there driving force behind the completion of a query.
@@ -35,5 +37,11 @@ public interface RootExec {
    * Inform all children to clean up and go away.
    */
   public void stop();
+
+  /**
+   * Inform sender that receiving fragment is finished and doesn't need any more data
+   * @param handle
+   */
+  public void receivingFragmentFinished(FragmentHandle handle);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a8881f0..21a580b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -119,8 +119,12 @@ public class ScanBatch implements RecordBatch {
   }
 
   @Override
-  public void kill() {
-    releaseAssets();
+  public void kill(boolean sendUpstream) {
+    if (sendUpstream) {
+      done = true;
+    } else {
+      releaseAssets();
+    }
   }
 
   private void releaseAssets() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 325e315..26aa5ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -56,6 +56,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private int recMajor;
     private FragmentContext context;
     private volatile boolean ok = true;
+    private volatile boolean done = false;
     private final SendingAccountor sendCount = new SendingAccountor();
 
     public enum Metric implements MetricDef {
@@ -81,11 +82,18 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     @Override
     public boolean innerNext() {
       if(!ok){
-        incoming.kill();
+        incoming.kill(false);
         
         return false;
       }
-      IterOutcome out = next(incoming);
+
+      IterOutcome out;
+      if (!done) {
+        out = next(incoming);
+      } else {
+        incoming.kill(true);
+        out = IterOutcome.NONE;
+      }
 //      logger.debug("Outcome of sender next {}", out);
       switch(out){
       case STOP:
@@ -132,8 +140,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       oContext.close();
       incoming.cleanup();
     }
-    
-    
+
+    @Override
+    public void receivingFragmentFinished(FragmentHandle handle) {
+      done = true;
+    }
+
     private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 0132e85..fb9554c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -101,11 +101,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();
   }
@@ -203,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       return IterOutcome.OK_NEW_SCHEMA;
 
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
-      kill();
+      kill(false);
       logger.error("Failure during query", ex);
       context.fail(ex);
       return IterOutcome.STOP;
@@ -297,8 +292,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
 
@@ -334,7 +329,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     }
 
     @Override
-    public void kill() {
+    public void kill(boolean sendUpstream) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 43e0dd4..29b346d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -67,8 +67,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
   @Override
@@ -100,7 +100,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
           try{
             setupNewSchema();
           }catch(Exception ex){
-            kill();
+            kill(false);
             logger.error("Failure during query", ex);
             context.fail(ex);
             return IterOutcome.STOP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b30a357..393fa4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -180,7 +180,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
       context.fail(ex);
       container.clear();
-      incoming.kill();
+      incoming.kill(false);
       return false;
     }finally{
       stats.stopSetup();
@@ -301,8 +301,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 2f71bf9..3913112 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -157,7 +157,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
       context.fail(ex);
       container.clear();
-      incoming.kill();
+      incoming.kill(false);
       return false;
     }finally{
       stats.stopSetup();
@@ -338,8 +338,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 46f7d51..7233f69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -242,6 +242,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                     for (VectorWrapper<?> wrapper : left) {
                       wrapper.getValueVector().clear();
                     }
+                    left.kill(true);
                     leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
                     while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
                       for (VectorWrapper<?> wrapper : left) {
@@ -260,7 +261,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
             return IterOutcome.NONE;
         } catch (ClassTransformationException | SchemaChangeException | IOException e) {
             context.fail(e);
-            killIncoming();
+            killIncoming(false);
             return IterOutcome.STOP;
         }
     }
@@ -483,9 +484,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
 
     @Override
-    public void killIncoming() {
-        this.left.kill();
-        this.right.kill();
+    public void killIncoming(boolean sendUpstream) {
+        this.left.kill(sendUpstream);
+        this.right.kill(sendUpstream);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 0c6657c..24ca463 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -172,7 +172,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           first = true;
         } catch (ClassTransformationException | IOException | SchemaChangeException e) {
           context.fail(new SchemaChangeException(e));
-          kill();
+          kill(false);
           return IterOutcome.STOP;
         } finally {
           stats.stopSetup();
@@ -191,7 +191,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         setRecordCountInContainer();
         return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
       case FAILURE:
-        kill();
+        kill(false);
         return IterOutcome.STOP;
       case NO_MORE_DATA:
         logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE")));
@@ -233,9 +233,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   }
 
   @Override
-  protected void killIncoming() {
-    left.kill();
-    right.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    left.kill(sendUpstream);
+    right.kill(sendUpstream);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 078c4c4..12ee406 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -83,8 +83,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   @Override
   public IterOutcome innerNext() {
     if(!noEndLimit && recordsLeft <= 0) {
-      // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared
-      // Drain the incoming record batch and clear the memory
+      incoming.kill(true);
+
       IterOutcome upStream = incoming.next();
 
       while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 313fdec..b8e18af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -43,6 +44,9 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -60,6 +64,8 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
@@ -87,6 +93,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private BatchSchema schema;
   private VectorContainer outgoingContainer;
   private MergingReceiverGeneratorBase merger;
+  private MergingReceiverPOP config;
   private boolean hasRun = false;
   private boolean prevBatchWasFull = false;
   private boolean hasMoreIncoming = true;
@@ -119,6 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.context = context;
     this.outgoingContainer = new VectorContainer();
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
+    this.config = config;
   }
 
   private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
@@ -437,15 +445,49 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   @Override
-  public void kill() {
-    cleanup();
-    for (RawFragmentBatchProvider provider : fragProviders) {
-      provider.kill(context);
+  public void kill(boolean sendUpstream) {
+    if (sendUpstream) {
+      informSenders();
+    } else {
+      cleanup();
+      for (RawFragmentBatchProvider provider : fragProviders) {
+        provider.kill(context);
+      }
+    }
+  }
+
+  private void informSenders() {
+    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+            .setMajorFragmentId(config.getOppositeMajorFragmentId())
+            .setQueryId(context.getHandle().getQueryId())
+            .build();
+    for (int i = 0; i < config.getNumSenders(); i++) {
+      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+              .setMinorFragmentId(i)
+              .build();
+      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+              .setReceiver(context.getHandle())
+              .setSender(sender)
+              .build();
+      context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+    }
+  }
+
+  private class OutcomeListener implements RpcOutcomeListener<Ack> {
+
+    @Override
+    public void failed(RpcException ex) {
+      logger.warn("Failed to inform upstream that receiver is finished");
+    }
+
+    @Override
+    public void success(Ack value, ByteBuf buffer) {
+      // Do nothing
     }
   }
 
   @Override
-  protected void killIncoming() {
+  protected void killIncoming(boolean sendUpstream) {
     //No op
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index f677e54..45f32cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -300,7 +300,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         partitionVectors.add(w.getValueVector());
       }
     } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
-      kill();
+      kill(false);
       logger.error("Failure while building final partition table.", ex);
       context.fail(ex);
       return false;
@@ -419,8 +419,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
   @Override
@@ -441,7 +441,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
         setupNewSchema(vc);
       } catch (SchemaChangeException ex) {
-        kill();
+        kill(false);
         logger.error("Failure during query", ex);
         context.fail(ex);
         return IterOutcome.STOP;
@@ -474,7 +474,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       try {
         setupNewSchema(vc);
       } catch (SchemaChangeException ex) {
-        kill();
+        kill(false);
         logger.error("Failure during query", ex);
         context.fail(ex);
         return IterOutcome.STOP;
@@ -504,7 +504,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       try {
         setupNewSchema(incoming);
       } catch (SchemaChangeException ex) {
-        kill();
+        kill(false);
         logger.error("Failure during query", ex);
         context.fail(ex);
         return IterOutcome.STOP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java
new file mode 100644
index 0000000..71a1590
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+
+public interface PartitionOutgoingBatch {
+
+  public long getTotalRecords();
+
+  public void terminate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 69be256..14cf092 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -31,22 +33,16 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.MetricValue;
-import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.vector.CopyUtil;
 
@@ -66,6 +62,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
+
+  private final AtomicIntegerArray remainingReceivers;
+  private final AtomicInteger remaingReceiverCount;
+  private volatile boolean done = false;
   
   long minReceiverRecordCount = Long.MAX_VALUE;
   long maxReceiverRecordCount = Long.MIN_VALUE;
@@ -94,6 +94,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
     this.statusHandler = new StatusHandler(sendCount, context);
+    this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
+    this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
+  }
+
+  private boolean done() {
+    for (int i = 0; i < remainingReceivers.length(); i++) {
+      if (remainingReceivers.get(i) == 0) {
+        return false;
+      }
+    }
+    return true;
   }
 
   @Override
@@ -106,7 +117,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
       return false;
     }
 
-    RecordBatch.IterOutcome out = next(incoming);
+    IterOutcome out;
+    if (!done) {
+      out = next(incoming);
+    } else {
+      incoming.kill(true);
+      out = IterOutcome.NONE;
+    }
 
     logger.debug("Partitioner.next(): got next record batch with status {}", out);
     switch(out){
@@ -119,7 +136,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
             sendEmptyBatch();
           }
         } catch (IOException e) {
-          incoming.kill();
+          incoming.kill(false);
           logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
           context.fail(e);
         }
@@ -140,12 +157,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
           }
           createPartitioner();
         } catch (IOException e) {
-          incoming.kill();
+          incoming.kill(false);
           logger.error("Error while flushing outgoing batches", e);
           context.fail(e);
           return false;
         } catch (SchemaChangeException e) {
-          incoming.kill();
+          incoming.kill(false);
           logger.error("Error while setting up partitioner", e);
           context.fail(e);
           return false;
@@ -155,7 +172,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           partitioner.partitionBatch(incoming);
         } catch (IOException e) {
           context.fail(e);
-          incoming.kill();
+          incoming.kill(false);
           return false;
         }
         for (VectorWrapper<?> v : incoming) {
@@ -206,9 +223,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
-  public void updateStats(List<? extends PartitionStatsBatch> outgoing) {
+  public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) {
     long records = 0;
-    for (PartitionStatsBatch o : outgoing) {
+    for (PartitionOutgoingBatch o : outgoing) {
       long totalRecords = o.getTotalRecords();
       minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
       maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
@@ -220,6 +237,18 @@ public class PartitionSenderRootExec extends BaseRootExec {
     stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount);
     stats.setLongStat(Metric.N_RECEIVERS, outgoing.size());
   }
+
+  @Override
+  public void receivingFragmentFinished(FragmentHandle handle) {
+    int id = handle.getMinorFragmentId();
+    if (remainingReceivers.compareAndSet(id, 0, 1)) {
+      partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate();
+      int remaining = remaingReceiverCount.decrementAndGet();
+      if (remaining == 0) {
+        done = true;
+      }
+    }
+  }
   
   public void stop() {
     logger.debug("Partition sender stopping.");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
deleted file mode 100644
index 85ccffb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-
-public interface PartitionStatsBatch {
-
-  public long getTotalRecords();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 53528ba..c5fe154 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
 
 import java.io.IOException;
 import java.util.List;
@@ -44,7 +43,7 @@ public interface Partitioner {
   public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;
   public abstract void initialize();
   public abstract void clear();
-  public abstract List<? extends PartitionStatsBatch> getOutgoingBatches();
+  public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches();
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index fcbd954..3141aed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -51,7 +51,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -71,7 +70,7 @@ public abstract class PartitionerTemplate implements Partitioner {
   }
 
   @Override
-  public List<? extends PartitionStatsBatch> getOutgoingBatches() {
+  public List<? extends PartitionOutgoingBatch> getOutgoingBatches() {
     return outgoingBatches;
   }
 
@@ -203,7 +202,7 @@ public abstract class PartitionerTemplate implements Partitioner {
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
   public abstract int doEval(@Named("inIndex") int inIndex);
 
-  public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible {
+  public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {
 
     private final DataTunnel tunnel;
     private final HashPartitionSender operator;
@@ -214,6 +213,8 @@ public abstract class PartitionerTemplate implements Partitioner {
     private final int oppositeMinorFragmentId;
 
     private boolean isLast = false;
+    private volatile boolean terminated = false;
+    private boolean dropAll = false;
     private BatchSchema outSchema;
     private int recordCount;
     private int totalRecords;
@@ -247,6 +248,11 @@ public abstract class PartitionerTemplate implements Partitioner {
       return false;
     }
 
+    @Override
+    public void terminate() {
+      terminated = true;
+    }
+
     @RuntimeOverridden
     protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {};
 
@@ -254,9 +260,13 @@ public abstract class PartitionerTemplate implements Partitioner {
     protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; };
 
     public void flush() throws IOException {
-      final ExecProtos.FragmentHandle handle = context.getHandle();
+      if (dropAll) {
+        vectorContainer.zeroVectors();
+        return;
+      }
+      final FragmentHandle handle = context.getHandle();
 
-      if (recordCount != 0) {
+      if (recordCount != 0 && !terminated) {
 
         for(VectorWrapper<?> w : vectorContainer){
           w.getValueVector().getMutator().setValueCount(recordCount);
@@ -280,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner {
         this.sendCount.increment();
       } else {
         logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
-        if (isLast) {
+        if (isLast || terminated) {
           // send final (empty) batch
-          FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+          FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
                   handle.getQueryId(),
                   handle.getMajorFragmentId(),
                   handle.getMinorFragmentId(),
@@ -296,7 +306,8 @@ public abstract class PartitionerTemplate implements Partitioner {
             stats.stopWait();
           }
           this.sendCount.increment();
-          vectorContainer.clear();
+          vectorContainer.zeroVectors();
+          dropAll = true;
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 2dae502..f091aa9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -182,7 +182,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   @Override
-  protected void killIncoming() {
+  protected void killIncoming(boolean sendUpstream) {
     producer.interrupt();
     stop = true;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index f21673d..dbb547d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -74,11 +74,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();
   }
@@ -148,7 +143,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
       return IterOutcome.OK_NEW_SCHEMA;
 
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
-      kill();
+      kill(false);
       logger.error("Failure during query", ex);
       context.fail(ex);
       return IterOutcome.STOP;
@@ -209,8 +204,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index ddee38a..1f2f843 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -59,21 +59,21 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   }
 
   @Override
-  public void kill() {
+  public void kill(boolean sendUpstream) {
     if(current != null){
-      current.kill();
+      current.kill(sendUpstream);
       current = null;
     }
     for(;incomingIterator.hasNext();){
-      incomingIterator.next().kill();
+      incomingIterator.next().kill(sendUpstream);
     }
   }
 
   @Override
-  protected void killIncoming() {
+  protected void killIncoming(boolean sendUpstream) {
     for (int i = 0; i < incoming.size(); i++) {
       RecordBatch in = incoming.get(i);
-      in.kill();
+      in.kill(sendUpstream);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 79669fa..16a68b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver;
 import java.io.IOException;
 import java.util.Iterator;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -28,6 +29,9 @@ import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -40,6 +44,9 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.ControlTunnel.ReceiverFinished;
 
 public class UnorderedReceiverBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
@@ -50,6 +57,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
   private BatchSchema schema;
   private OperatorStats stats;
   private boolean first = true;
+  private UnorderedReceiver config;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -70,6 +78,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
     this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
+    this.config = config;
   }
 
   @Override
@@ -88,8 +97,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public void kill() {
-    fragProvider.kill(context);
+  public void kill(boolean sendUpstream) {
+    if (sendUpstream) {
+      informSenders();
+    } else {
+      fragProvider.kill(context);
+    }
   }
 
   @Override
@@ -188,4 +201,34 @@ public class UnorderedReceiverBatch implements RecordBatch {
     throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
   }
 
+  private void informSenders() {
+    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+            .setMajorFragmentId(config.getOppositeMajorFragmentId())
+            .setQueryId(context.getHandle().getQueryId())
+            .build();
+    for (int i = 0; i < config.getNumSenders(); i++) {
+      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+              .setMinorFragmentId(i)
+              .build();
+      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+              .setReceiver(context.getHandle())
+              .setSender(sender)
+              .build();
+      context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+    }
+  }
+
+  private class OutcomeListener implements RpcOutcomeListener<Ack> {
+
+    @Override
+    public void failed(RpcException ex) {
+      logger.warn("Failed to inform upstream that receiver is finished");
+    }
+
+    @Override
+    public void success(Ack value, ByteBuf buffer) {
+      // Do nothing
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 20e4de4..14110e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -82,8 +82,8 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
+  public void kill(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 08219a1..d4b1001 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -132,8 +132,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
+  public void kill(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
   @Override
@@ -324,7 +324,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       return IterOutcome.OK_NEW_SCHEMA;
 
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
-      kill();
+      kill(false);
       logger.error("Failure during query", ex);
       context.fail(ex);
       return IterOutcome.STOP;
@@ -577,8 +577,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index fd584cb..9313018 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -44,7 +44,7 @@ public class PlannerSettings implements FrameworkContext{
   public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
   public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000);
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d);
-  public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", true);
+  public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false);
   public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10);
   public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 088b120..e8ad311 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -106,11 +106,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   @Override
-  public void kill() {
-    killIncoming();
+  public void kill(boolean sendUpstream) {
+    killIncoming(sendUpstream);
   }
 
-  protected abstract void killIncoming();
+  protected abstract void killIncoming(boolean sendUpstream);
 
   public void cleanup(){
     container.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 721755d..bea7bbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -36,8 +36,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   }
 
   @Override
-  protected void killIncoming() {
-    incoming.kill();
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
   @Override
@@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
         stats.startSetup();
         setupNewSchema();
       }catch(SchemaChangeException ex){
-        kill();
+        kill(false);
         logger.error("Failure during query", ex);
         context.fail(ex);
         return IterOutcome.STOP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 7617d91..9b28179 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -79,7 +79,7 @@ public interface RecordBatch extends VectorAccessible {
    * Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine
    * what has happened.
    */
-  public void kill();
+  public void kill(boolean sendUpstream);
 
   public abstract SelectionVector2 getSelectionVector2();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index b398e47..9953e5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control;
 
 
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -37,6 +38,7 @@ public class ControlRpcConfig {
       .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
       .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+      .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
       .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
       .build();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 9a26039..d035c10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -56,6 +57,11 @@ public class ControlTunnel {
     manager.runCommand(b);
   }
 
+  public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){
+    ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver);
+    manager.runCommand(b);
+  }
+
   public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
     SendFragmentStatus b = new SendFragmentStatus(status);
     manager.runCommand(b);
@@ -84,6 +90,21 @@ public class ControlTunnel {
 
   }
 
+
+  public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> {
+    final FinishedReceiver finishedReceiver;
+
+    public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) {
+      super(listener);
+      this.finishedReceiver = finishedReceiver;
+    }
+
+    @Override
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class);
+    }
+  }
+
   public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
     final FragmentHandle handle;
 
@@ -127,5 +148,4 @@ public class ControlTunnel {
       connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index afd3fa2..893aec8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -74,6 +75,11 @@ public class ControlHandlerImpl implements ControlMessageHandler {
       cancelFragment(handle);
       return DataRpcConfig.OK;
 
+    case RpcType.REQ_RECEIVER_FINISHED_VALUE:
+      FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+      receivingFragmentFinished(finishedReceiver);
+      return DataRpcConfig.OK;
+
     case RpcType.REQ_FRAGMENT_STATUS_VALUE:
       bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER));
       // TODO: Support a type of message that has no response.
@@ -159,6 +165,22 @@ public class ControlHandlerImpl implements ControlMessageHandler {
     return Acks.OK;
   }
 
+  public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) {
+    FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
+
+    FragmentExecutor executor;
+    if(manager != null) {
+      executor = manager.getRunnable();
+    } else {
+      // then try local cancel.
+      executor = bee.getFragmentRunner(finishedReceiver.getSender());
+    }
 
+    if (executor != null) {
+      executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+    }
+
+    return Acks.OK;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 735e663..c5c08e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -71,6 +72,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     }
   }
 
+  public void receivingFragmentFinished(FragmentHandle handle) {
+    root.receivingFragmentFinished(handle);
+  }
+
   public UserClientConnection getClient(){
     return context.getConnection();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 7dce6e0..db8ff8e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -85,6 +86,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
   }
 
   @Override
+  public void receivingFragmentFinished(FragmentHandle handle) {
+    //no op
+  }
+
+  @Override
   public Iterator<ValueVector> iterator() {
     List<ValueVector> vv = Lists.newArrayList();
     for(VectorWrapper<?> vw : incoming){


Mime
View raw message