drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [3/3] git commit: Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.
Date Wed, 22 May 2013 01:39:07 GMT
Clean up threading of client/server.  Utilize command pattern for BitCom stuff to abstract away connection failures.  Works on one bit single exchange remote query now.  Next up, two bit single exchange query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b8db98ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b8db98ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b8db98ad

Branch: refs/heads/execwork
Commit: b8db98ad7c159db3cf41a3866ff53013f87964b4
Parents: e57a8d6
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue May 21 18:38:56 2013 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Tue May 21 18:38:56 2013 -0700

----------------------------------------------------------------------
 .../drill/common/graph/AdjacencyListBuilder.java   |    2 +-
 .../org/apache/drill/exec/cache/LocalCache.java    |    2 +
 .../org/apache/drill/exec/client/DrillClient.java  |   77 ++++--
 .../drill/exec/coord/LocalClusterCoordinator.java  |    7 +-
 .../org/apache/drill/exec/ops/FragmentContext.java |   19 ++-
 .../org/apache/drill/exec/ops/QueryContext.java    |    4 +
 .../exec/physical/config/MockRecordReader.java     |    1 -
 .../drill/exec/physical/config/RandomReceiver.java |    5 -
 .../apache/drill/exec/physical/config/Screen.java  |    2 +-
 .../drill/exec/physical/impl/ScreenCreator.java    |   79 +++++-
 .../exec/physical/impl/SingleSenderCreator.java    |   41 +++-
 .../drill/exec/physical/impl/WireRecordBatch.java  |    8 +-
 .../impl/materialize/QueryWritableBatch.java       |    8 +
 .../impl/materialize/VectorRecordMaterializer.java |   11 +-
 .../drill/exec/planner/fragment/Materializer.java  |    8 +-
 .../exec/planner/fragment/SimpleParallelizer.java  |    4 +-
 .../exec/planner/fragment/StatsCollector.java      |    2 +-
 .../apache/drill/exec/record/RawFragmentBatch.java |    5 +
 .../drill/exec/rpc/AbstractHandshakeHandler.java   |    5 +-
 .../drill/exec/rpc/BaseRpcOutcomeListener.java     |   32 +++
 .../org/apache/drill/exec/rpc/BasicClient.java     |  176 ++++++++------
 .../drill/exec/rpc/BasicClientWithConnection.java  |    9 +-
 .../org/apache/drill/exec/rpc/BasicServer.java     |    7 +-
 .../rpc/ChannelListenerWithCoordinationId.java     |   25 ++
 .../apache/drill/exec/rpc/CoordinationQueue.java   |   96 +++++++--
 .../org/apache/drill/exec/rpc/DrillRpcFuture.java  |    2 -
 .../apache/drill/exec/rpc/DrillRpcFutureImpl.java  |   70 +-----
 .../java/org/apache/drill/exec/rpc/RpcBus.java     |   88 ++++---
 .../apache/drill/exec/rpc/RpcCheckedFuture.java    |   33 +++
 .../drill/exec/rpc/RpcConnectionHandler.java       |   28 +++
 .../org/apache/drill/exec/rpc/RpcException.java    |   13 +
 .../java/org/apache/drill/exec/rpc/RpcOutcome.java |   26 ++
 .../apache/drill/exec/rpc/RpcOutcomeListener.java  |    7 +-
 .../exec/rpc/ZeroCopyProtobufLengthDecoder.java    |    2 +-
 .../org/apache/drill/exec/rpc/bit/BitClient.java   |   52 ++---
 .../java/org/apache/drill/exec/rpc/bit/BitCom.java |    8 +-
 .../org/apache/drill/exec/rpc/bit/BitComImpl.java  |  129 ++---------
 .../org/apache/drill/exec/rpc/bit/BitCommand.java  |   28 +++
 .../apache/drill/exec/rpc/bit/BitConnection.java   |   79 +-----
 .../drill/exec/rpc/bit/BitConnectionManager.java   |  175 +++++++++++---
 .../org/apache/drill/exec/rpc/bit/BitServer.java   |   60 ++++-
 .../org/apache/drill/exec/rpc/bit/BitTunnel.java   |  187 ++++-----------
 .../exec/rpc/bit/ConnectionManagerRegistry.java    |   73 ++++++
 .../drill/exec/rpc/bit/FutureBitCommand.java       |   78 ++++++
 .../apache/drill/exec/rpc/bit/ListenerPool.java    |   15 +-
 .../drill/exec/rpc/bit/ListeningBitCommand.java    |   73 ++++++
 .../drill/exec/rpc/user/QueryResultBatch.java      |    7 +
 .../drill/exec/rpc/user/QueryResultHandler.java    |  153 ++++++++++++
 .../org/apache/drill/exec/rpc/user/UserClient.java |  153 ++-----------
 .../drill/exec/rpc/user/UserResultsListener.java   |   11 +-
 .../org/apache/drill/exec/rpc/user/UserServer.java |   10 +-
 .../apache/drill/exec/server/BootStrapContext.java |    2 +-
 .../apache/drill/exec/server/RemoteServiceSet.java |    2 +
 .../apache/drill/exec/service/ServiceEngine.java   |    6 +-
 .../apache/drill/exec/work/EndpointListener.java   |    5 +-
 .../org/apache/drill/exec/work/FragmentRunner.java |   10 +-
 .../org/apache/drill/exec/work/WorkManager.java    |    7 +-
 .../exec/work/batch/AbstractFragmentCollector.java |    7 +-
 .../drill/exec/work/batch/BatchCollector.java      |    3 +-
 .../drill/exec/work/batch/BitComHandlerImpl.java   |    9 +-
 .../drill/exec/work/batch/IncomingBuffers.java     |   21 +-
 .../drill/exec/work/batch/MergingCollector.java    |    5 +-
 .../exec/work/batch/PartitionedCollector.java      |    1 +
 .../exec/work/batch/UnlmitedRawBatchBuffer.java    |    2 +-
 .../apache/drill/exec/work/foreman/Foreman.java    |   22 +-
 .../exec/work/foreman/RunningFragmentManager.java  |    8 +-
 .../work/fragment/IncomingFragmentHandler.java     |    2 +-
 .../exec/work/fragment/LocalFragmentHandler.java   |    4 +-
 .../exec/work/fragment/RemoteFragmentHandler.java  |    4 +-
 .../exec/physical/impl/DistributedFragmentRun.java |   17 +-
 .../org/apache/drill/exec/pop/CheckFragmenter.java |   21 +--
 .../org/apache/drill/exec/pop/FragmentChecker.java |   41 +++-
 .../org/apache/drill/exec/server/TestBitRpc.java   |   26 ++-
 .../test/resources/physical_single_exchange.json   |    1 -
 74 files changed, 1498 insertions(+), 923 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
index 1668477..4a385ce 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
@@ -57,7 +57,7 @@ import java.util.Map;
   }
 
   public AdjacencyList<V> getAdjacencyList() {
-    logger.debug("Values; {}", ops.values().toArray());
+//    logger.debug("Values; {}", ops.values().toArray());
     AdjacencyList<V> a = new AdjacencyList<V>();
 
     for (AdjacencyList<V>.Node from : ops.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index ddb2a02..b656f2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -43,11 +43,13 @@ public class LocalCache implements DistributedCache {
 
   @Override
   public PlanFragment getFragment(FragmentHandle handle) {
+    logger.debug("looking for fragment with handle: {}", handle);
     return handles.get(handle);
   }
 
   @Override
   public void storeFragment(PlanFragment fragment) {
+    logger.debug("Storing fragment: {}", fragment);
     handles.put(fragment.getHandle(), fragment);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bb7f77e..c35e834 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -30,22 +30,23 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Vector;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.proto.UserProtos.RpcType;
-import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.rpc.user.UserRpcConfig;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
@@ -75,9 +76,6 @@ public class DrillClient implements Closeable{
   }
   
   
-  
-
-
   /**
    * Connects the client to a Drillbit server
    *
@@ -97,7 +95,9 @@ public class DrillClient implements Closeable{
     this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
     try {
       logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
-      this.client.connect(endpoint);
+      FutureHandler f = new FutureHandler();
+      this.client.connect(f, endpoint);
+      f.checkedGet();
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
@@ -120,34 +120,63 @@ public class DrillClient implements Closeable{
    * @throws RpcException
    */
   public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
-    try {
-      ListHoldingResultsListener listener = new ListHoldingResultsListener();
-      Future<Void> f = client.submitQuery(newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(), listener);
-      f.get();
-      if(listener.ex != null){
-        throw listener.ex;
-      }else{
-        return listener.results;
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      throw new RpcException(e);
-    }
+    ListHoldingResultsListener listener = new ListHoldingResultsListener();
+    client.submitQuery(listener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
+    return listener.getResults();
+
   }
   
-  private class ListHoldingResultsListener extends UserResultsListener{
-    private RpcException ex;
+  private class ListHoldingResultsListener implements UserResultsListener {
     private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+    private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
     
     @Override
     public void submissionFailed(RpcException ex) {
       logger.debug("Submission failed.", ex);
-      this.ex = ex;
+      future.setException(ex);
     }
 
     @Override
     public void resultArrived(QueryResultBatch result) {
       logger.debug("Result arrived.  Is Last Chunk: {}.  Full Result: {}", result.getHeader().getIsLastChunk(), result);
       results.add(result);
+      if(result.getHeader().getIsLastChunk()){
+        future.set(results);
+      }
+    }
+  
+    public List<QueryResultBatch> getResults() throws RpcException{
+      try{
+        return future.get();
+      }catch(Throwable t){
+        throw RpcException.mapException(t);
+      }
+    }
+  }
+  
+  private class FutureHandler extends AbstractCheckedFuture<Void, RpcException> implements RpcConnectionHandler<ServerConnection>, DrillRpcFuture<Void>{
+
+    protected FutureHandler() {
+      super( SettableFuture.<Void>create());
+    }
+
+    @Override
+    public void connectionSucceeded(ServerConnection connection) {
+      getInner().set(null);
+    }
+
+    @Override
+    public void connectionFailed(FailureType type, Throwable t) {
+      getInner().setException(new RpcException(String.format("Failure connecting to server. Failure of type %s.", type.name()), t));
+    }
+
+    private SettableFuture<Void> getInner(){
+      return (SettableFuture<Void>) delegate();
+    }
+    
+    @Override
+    protected RpcException mapException(Exception e) {
+      return RpcException.mapException(e);
     }
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
index 43a5430..f7b3549 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
@@ -29,17 +29,16 @@ import com.google.common.collect.Maps;
 public class LocalClusterCoordinator extends ClusterCoordinator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
 
-  private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints;
+  private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
   
   @Override
   public void close() throws IOException {
-    endpoints = null;
+    endpoints.clear();
   }
 
   @Override
   public void start(long millis) throws Exception {
     logger.debug("Local Cluster Coordinator started.");
-    endpoints = Maps.newConcurrentMap();
   }
 
   @Override
@@ -52,6 +51,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator{
 
   @Override
   public void unregister(RegistrationHandle handle) {
+    if(handle == null) return;
+    
     endpoints.remove(handle);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e64453c..33707a0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -52,7 +52,9 @@ public class FragmentContext {
   private final FragmentHandle handle;
   private final UserClientConnection connection;
   private final IncomingBuffers buffers;
-
+  private volatile Throwable failureCause;
+  private volatile boolean failed = false;
+  
   public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers) {
     this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
     this.batchesCompleted = new SingleThreadNestedCounter(dbContext, METRIC_BATCHES_COMPLETED);
@@ -65,9 +67,10 @@ public class FragmentContext {
   }
 
   public void fail(Throwable cause) {
-
+    logger.debug("Fragment Context received failure. {}", cause);
+    failed = true;
+    failureCause = cause;
   }
-
   
   public DrillbitContext getDrillbitContext(){
     return context;
@@ -107,4 +110,14 @@ public class FragmentContext {
   public IncomingBuffers getBuffers(){
     return buffers;
   }
+
+  public Throwable getFailureCause() {
+    return failureCause;
+  }
+  
+  public boolean isFailed(){
+    return failed;
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fd24deb..1c251b8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.server.DrillbitContext;
 
 public class QueryContext {
@@ -57,4 +58,7 @@ public class QueryContext {
     return drillbitContext.getPlanReader();
   }
   
+  public BitCom getBitCom(){
+    return drillbitContext.getBitCom();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index eaaeaa3..6a1eba4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -76,7 +76,6 @@ public class MockRecordReader implements RecordReader {
       int batchRecordCount = 250000 / estimateRowSize;
 
       for (int i = 0; i < config.getTypes().length; i++) {
-        logger.debug("Adding field {} of type {}", i, config.getTypes()[i]);
         valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
         output.addField(i, valueVectors[i]);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
index ed41586..6772fb0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -72,11 +72,6 @@ public class RandomReceiver extends AbstractReceiver{
     return new Size(1,1);
   }
 
-  @Override
-  public int getOppositeMajorFragmentId() {
-    return 0;
-  }
-
   
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 86a201d..688c6b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -64,7 +64,7 @@ public class Screen extends AbstractStore implements Root{
     // didn't get screwed up.
     if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
     DrillbitEndpoint endpoint = endpoints.iterator().next();
-    logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
+//    logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
     if (!endpoint.equals(this.endpoint)) {
       throw new PhysicalOperatorSetupException(String.format(
           "A Screen operator can only be assigned to its home node.  Expected endpoint %s, Actual endpoint: %s",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c0711db..c20538d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,21 +17,32 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
 import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
 
 import com.google.common.base.Preconditions;
 
 public class ScreenCreator implements RootCreator<Screen>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
-
+  
+  
   @Override
   public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
     Preconditions.checkArgument(children.size() == 1);
@@ -40,7 +51,9 @@ public class ScreenCreator implements RootCreator<Screen>{
   
   
   private static class ScreenRoot implements RootExec{
-
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
+    volatile boolean ok = true;
+    
     final RecordBatch incoming;
     final FragmentContext context;
     final UserClientConnection connection;
@@ -56,25 +69,53 @@ public class ScreenCreator implements RootCreator<Screen>{
     
     @Override
     public boolean next() {
+      if(!ok){
+        stop();
+        return false;
+      }
+      
       IterOutcome outcome = incoming.next();
-      boolean isLast = false;
+      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
-      case NONE:
-      case STOP:
-        connection.sendResult(materializer.convertNext(true));
-        context.batchesCompleted.inc(1);
-        context.recordsCompleted.inc(incoming.getRecordCount());
+      case STOP: {
+          QueryResult header1 = QueryResult.newBuilder() //
+              .setQueryId(context.getHandle().getQueryId()) //
+              .setRowCount(0) //
+              .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger))
+              .setDef(RecordBatchDef.getDefaultInstance()) //
+              .setIsLastChunk(true) //
+              .build();
+          QueryWritableBatch batch1 = new QueryWritableBatch(header1);
+
+          connection.sendResult(listener, batch1);
+          return false;
+      }
+      case NONE: {
+        if(materializer == null){
+          // receive no results.
+          context.batchesCompleted.inc(1);
+          context.recordsCompleted.inc(incoming.getRecordCount());
+          QueryResult header2 = QueryResult.newBuilder() //
+              .setQueryId(context.getHandle().getQueryId()) //
+              .setRowCount(0) //
+              .setDef(RecordBatchDef.getDefaultInstance()) //
+              .setIsLastChunk(true) //
+              .build();
+          QueryWritableBatch batch2 = new QueryWritableBatch(header2);
+          connection.sendResult(listener, batch2);
+        }else{
+          connection.sendResult(listener, materializer.convertNext(true));
+        }
         return false;
-        
+      }
       case OK_NEW_SCHEMA:
         materializer = new VectorRecordMaterializer(context, incoming);
         // fall through.
-        // fall through
       case OK:
-        connection.sendResult(materializer.convertNext(false));
         context.batchesCompleted.inc(1);
         context.recordsCompleted.inc(incoming.getRecordCount());
-        return !isLast;
+        connection.sendResult(listener, materializer.convertNext(false));
+        return true;
       default:
         throw new UnsupportedOperationException();
       }
@@ -85,6 +126,20 @@ public class ScreenCreator implements RootCreator<Screen>{
       incoming.kill();
     }
 
+    private SendListener listener = new SendListener();
+    
+    private class SendListener extends BaseRpcOutcomeListener<Ack>{
+
+      @Override
+      public void failed(RpcException ex) {
+        logger.error("Failure while sending data to user.", ex);
+        ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+        ok = false;
+      }
+      
+    }
     
   }
+  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 60c2d78..b7d4c7e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -23,9 +23,12 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.bit.BitTunnel;
 
 public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -45,9 +48,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private FragmentHandle handle;
     private int recMajor;
     private FragmentContext context;
+    private volatile boolean ok = true;
     
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
-      logger.debug("Creating single sender root exec base on config: {}", config);
       this.incoming = batch;
       this.handle = context.getHandle();
       this.recMajor = config.getOppositeMajorFragmentId();
@@ -57,20 +60,24 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     
     @Override
     public boolean next() {
+      if(!ok){
+        incoming.kill();
+        
+        return false;
+      }
       IterOutcome out = incoming.next();
       logger.debug("Outcome of sender next {}", out);
       switch(out){
       case STOP:
       case NONE:
-        FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
-        tunnel.sendRecordBatch(context, b2);
+        FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
         return false;
-        
 
       case OK:
       case OK_NEW_SCHEMA:
-        FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
-        tunnel.sendRecordBatch(context, batch);
+        FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
         return true;
 
       case NOT_YET:
@@ -81,9 +88,31 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
     @Override
     public void stop() {
+      ok = false;
     }
     
     
+    private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
+
+      @Override
+      public void failed(RpcException ex) {
+        context.fail(ex);
+        stop();
+      }
+
+      @Override
+      public void success(Ack value) {
+        if(value.getOk()) return;
+        
+        logger.error("Downstream fragment was not accepted.  Stopping future sends.");
+        // if we didn't get ack ok, we'll need to kill the query.
+        context.fail(new RpcException("A downstream fragment batch wasn't accepted.  This fragment thus fails."));
+        stop();
+      }
+      
+    }
     
   }
+  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fc7f833..b41b0cd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -38,6 +38,7 @@ public class WireRecordBatch implements RecordBatch{
   private RecordBatchLoader batchLoader;
   private RawFragmentBatchProvider fragProvider;
   private FragmentContext context;
+  private BatchSchema schema;
 
   
   public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
@@ -53,7 +54,7 @@ public class WireRecordBatch implements RecordBatch{
 
   @Override
   public BatchSchema getSchema() {
-    return null;
+    return schema;
   }
 
   @Override
@@ -73,13 +74,16 @@ public class WireRecordBatch implements RecordBatch{
 
   @Override
   public IterOutcome next() {
-    RawFragmentBatch batch = this.fragProvider.getNext();
+    RawFragmentBatch batch = fragProvider.getNext();
     try{
       if(batch == null) return IterOutcome.NONE;
 
+      logger.debug("Next received batch {}", batch);
+
       RecordBatchDef rbd = batch.getHeader().getDef();
       boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
       if(schemaChanged){
+        this.schema = batchLoader.getSchema();
         return IterOutcome.OK_NEW_SCHEMA;
       }else{
         return IterOutcome.OK;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index 187e6e9..e8ed48a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.materialize;
 
+import java.util.Arrays;
+
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
@@ -42,5 +44,11 @@ public class QueryWritableBatch {
   public QueryResult getHeader() {
     return header;
   }
+
+  @Override
+  public String toString() {
+    return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index e2d2eb9..7929296 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.materialize;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
@@ -33,10 +34,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{
   public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
     this.queryId = context.getHandle().getQueryId();
     this.batch = batch;
-
-    for (MaterializedField f : batch.getSchema()) {
-      logger.debug("New Field: {}", f);
-    }
+    BatchSchema schema = batch.getSchema();
+    assert schema != null : "Schema must be defined.";
+    
+//    for (MaterializedField f : batch.getSchema()) {
+//      logger.debug("New Field: {}", f);
+//    }
   }
 
   public QueryWritableBatch convertNext(boolean isLast) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 9fee586..da71271 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -41,13 +41,13 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
       // this is a sending exchange.
       PhysicalOperator child = exchange.getChild().accept(this, iNode);
       PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child);
-      logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
+//      logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
       return materializedSender;
       
     }else{
       // receiving exchange.
       PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId());
-      logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
+//      logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
       return materializedReceiver;
     }
   }
@@ -63,7 +63,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
     
     try {
       PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
-      logger.debug("New materialized store node {} with child {}", o, child);
+//      logger.debug("New materialized store node {} with child {}", o, child);
       return o;
     } catch (PhysicalOperatorSetupException e) {
       throw new FragmentSetupException("Failure while generating a specific Store materialization.");
@@ -72,7 +72,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
 
   @Override
   public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException {
-    logger.debug("Visiting catch all: {}", op);
+//    logger.debug("Visiting catch all: {}", op);
     List<PhysicalOperator> children = Lists.newArrayList();
     for(PhysicalOperator child : op){
       children.add(child.accept(this, iNode));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index fc03a23..8adb447 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -145,7 +145,7 @@ public class SimpleParallelizer {
       // figure out width.
       int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
       float diskCost = stats.getDiskCost();
-      logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
+//      logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
 
       // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
       // of tasks or the maximum width of the fragment.
@@ -154,7 +154,7 @@ public class SimpleParallelizer {
       }
 
       if (width < 1) width = 1;
-      logger.debug("Setting width {} on fragment {}", width, wrapper);
+//      logger.debug("Setting width {} on fragment {}", width, wrapper);
       wrapper.setWidth(width);
       // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
       wrapper.assignEndpoints(allNodes);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index d53a78c..af8ec04 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -41,7 +41,7 @@ public class StatsCollector {
 
     Wrapper wrapper = planningSet.get(n);
     n.getRoot().accept(opStatCollector, wrapper);
-    logger.debug("Set stats to {}", wrapper.getStats());
+//    logger.debug("Set stats to {}", wrapper.getStats());
     // receivers...
     for (ExchangeFragmentPair child : n) {
       // get the fragment node that feeds this node.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index c244cea..4f87224 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -41,4 +41,9 @@ public class RawFragmentBatch {
     return body;
   }
 
+  @Override
+  public String toString() {
+    return "RawFragmentBatch [header=" + header + ", body=" + body + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index 859d385..ea591da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -25,8 +25,7 @@ import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
-public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
-    ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
 
   protected final EnumLite handshakeType;
@@ -41,7 +40,7 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
 
   @Override
   public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
-    coordinationId = inbound.coordinationId;
+    this.coordinationId = inbound.coordinationId;
     ctx.channel().pipeline().remove(this);
     if (inbound.rpcType != handshakeType.getNumber())
       throw new RpcException(String.format("Handshake failure.  Expected %s[%d] but received number [%d]",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
new file mode 100644
index 0000000..1dab1c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
+
+  @Override
+  public void failed(RpcException ex) {
+  }
+
+  @Override
+  public void success(T value) {
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0ff2b9d..0afc5d0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -29,22 +29,30 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
+
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
-public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection> extends RpcBus<T, R> {
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
+    extends RpcBus<T, R> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
-  private Bootstrap b;
+  private final Bootstrap b;
   private volatile boolean connect = false;
   protected R connection;
-  private EventLoopGroup eventLoop;
+  private final T handshakeType;
+  private final Class<HANDSHAKE_RESPONSE> responseClass;
+  private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
 
-  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
     super(rpcMapping);
-    this.eventLoop = eventLoopGroup;
+    this.responseClass = responseClass;
+    this.handshakeType = handshakeType;
+    this.handshakeParser = handshakeParser;
     
     b = new Bootstrap() //
         .group(eventLoopGroup) //
@@ -59,12 +67,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             logger.debug("initializing client connection.");
             connection = initRemoteConnection(ch);
             ch.closeFuture().addListener(getCloseHandler(connection));
-
+            
             ch.pipeline().addLast( //
                 new ZeroCopyProtobufLengthDecoder(), //
                 new RpcDecoder(rpcConfig.getName()), //
                 new RpcEncoder(rpcConfig.getName()), //
-                getHandshakeHandler(), //
+                new ClientHandshakeHandler(), //
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );
@@ -75,26 +83,9 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     ;
   }
 
-  protected abstract ClientHandshakeHandler<?> getHandshakeHandler();
-
-  protected abstract class ClientHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
-    private Class<T> responseType;
-
-    public ClientHandshakeHandler(EnumLite handshakeType, Class<T> responseType, Parser<T> parser) {
-      super(handshakeType, parser);
-      this.responseType = responseType;
-    }
-
-    @Override
-    protected final void consumeHandshake(Channel c, T msg) throws Exception {
-      validateHandshake(msg);
-      queue.getFuture(handshakeType.getNumber(), coordinationId, responseType).setValue(msg);
-    }
-
-    protected abstract void validateHandshake(T msg) throws Exception;
-
-  }
-
+  protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
+  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
+  
   protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
     return new ChannelClosedHandler();
   }
@@ -105,6 +96,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         "This shouldn't be used in client mode as a client only has a single connection.");
   }
 
+  protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
+      T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+    super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
+  }
+
   protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
     return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
@@ -115,65 +111,91 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     return true;
   }
 
-  /**
-   * TODO: This is a horrible hack to manage deadlock caused by creation of BitClient within BitCom.  Should be cleaned up.
-   */
-  private class HandshakeThread<SEND extends MessageLite, RECEIVE extends MessageLite> extends Thread {
-    final SettableFuture<RECEIVE> future;
-    T handshakeType;
-    SEND handshakeValue;
-    String host;
-    int port;
-    Class<RECEIVE> responseClass;
-
-    public HandshakeThread(T handshakeType, SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) {
-      super();
-      assert host != null && !host.isEmpty();
-      assert port > 0;
-      logger.debug("Creating new handshake thread to connec to {}:{}", host, port);
-      this.setName(String.format("handshake thread for %s", handshakeType.getClass().getCanonicalName()));
-      future = SettableFuture.create();
-      this.handshakeType = handshakeType;
+  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+    ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
+    b.connect(host, port).addListener(cml.connectionHandler);
+  }
+
+  private class ConnectionMultiListener {
+    private final RpcConnectionHandler<R> l;
+    private final HANDSHAKE_SEND handshakeValue;
+
+    public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) {
+      assert l != null;
+      assert handshakeValue != null;
+          
+      this.l = l;
       this.handshakeValue = handshakeValue;
-      this.host = host;
-      this.port = port;
-      this.responseClass = responseClass;
     }
 
-    @Override
-    public void run() {
-      try {
-        logger.debug("Starting to get client connection on host {}, port {}.", host, port);
-        
-        ChannelFuture f = b.connect(host, port);
-        f.sync();
-        if (connection == null) throw new RpcException("Failure while attempting to connect to server.");
-        connect = !connect;
-        logger.debug("Client connected, sending handshake.");
-        DrillRpcFuture<RECEIVE> fut = send(handshakeType, handshakeValue, responseClass);
-        future.set(fut.checkedGet());
-        logger.debug("Got bit client connection.");
-      } catch (Exception e) {
-        logger.debug("Failed to get client connection.", e);
-        future.setException(e);
+    public final ConnectionHandler connectionHandler = new ConnectionHandler();
+    public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
+
+    /**
+     * Manages connection establishment outcomes.
+     */
+    private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
+
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+//        logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
+        try {
+          future.get();
+          if (future.isSuccess()) {
+            send(handshakeSendHandler, handshakeType, handshakeValue, responseClass);
+          } else {
+            l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+          }
+//          logger.debug("Handshake queued for send.");
+        } catch (Exception ex) {
+          l.connectionFailed(FailureType.CONNECTION, ex);
+        }
       }
     }
 
+    /**
+     * manages handshake outcomes.
+     */
+    private class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
+
+      @Override
+      public void failed(RpcException ex) {
+        logger.debug("Failure while initiating handshake", ex);
+        l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+      }
+
+      @Override
+      public void success(HANDSHAKE_RESPONSE value) {
+//        logger.debug("Handshake received. {}", value);
+        try {
+          BasicClient.this.validateHandshake(value);
+          BasicClient.this.finalizeConnection(value, connection);
+          BasicClient.this.connect = true;
+          l.connectionSucceeded(connection);
+//          logger.debug("Handshake completed succesfully.");
+        } catch (RpcException ex) {
+          l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
+        }
+      }
+
+    }
+
   }
 
-  protected <SEND extends MessageLite, RECEIVE extends MessageLite> RECEIVE connectAsClient(T handshakeType,
-      SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) throws InterruptedException,
-      RpcException {
-    
-    
-    HandshakeThread<SEND, RECEIVE> ht = new HandshakeThread<SEND, RECEIVE>(handshakeType, handshakeValue, host, port, responseClass);
-    ht.start();
-    try{
-      return ht.future.get();  
-    }catch(Exception e){
-      throw new RpcException(e);
+  private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
+
+    public ClientHandshakeHandler() {
+      super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
     }
-    
+
+    @Override
+    protected final void consumeHandshake(Channel c, HANDSHAKE_RESPONSE msg) throws Exception {
+      // remove the handshake information from the queue so it doesn't sit there forever.
+      RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(), coordinationId,
+          responseClass);
+      response.set(msg);
+    }
+
   }
 
   public void close() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 0e62f14..2028db6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -26,13 +26,16 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
 
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 import com.google.protobuf.Internal.EnumLite;
 
-public abstract class BasicClientWithConnection<T extends EnumLite> extends BasicClient<T, ServerConnection>{
+public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends BasicClient<T, ServerConnection, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
 
-  public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(rpcMapping, alloc, eventLoopGroup);
+  public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+    super(rpcMapping, alloc, eventLoopGroup, handshakeType, responseClass, handshakeParser);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 52bb0a2..af5d9c9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -72,7 +72,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
                 new ZeroCopyProtobufLengthDecoder(), //
                 new RpcDecoder(rpcConfig.getName()), //
                 new RpcEncoder(rpcConfig.getName()), //
-                getHandshakeHandler(),
+                getHandshakeHandler(connection),
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );            
@@ -88,7 +88,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   }
 
   
-  protected abstract ServerHandshakeHandler<?> getHandshakeHandler();
+  protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
 
   protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
 
@@ -104,9 +104,6 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
     
     public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
     
-
-      
-    
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
new file mode 100644
index 0000000..27e9dee
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+public interface ChannelListenerWithCoordinationId extends GenericFutureListener<ChannelFuture>{
+  public int getCoordinationId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 70142bb..9edbe11 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -17,8 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 
@@ -29,31 +33,93 @@ public class CoordinationQueue {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
 
   private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
-  private final Map<Integer, DrillRpcFutureImpl<?>> map;
+  private final Map<Integer, RpcOutcome<?>> map;
 
   public CoordinationQueue(int segmentSize, int segmentCount) {
-    map = new ConcurrentHashMap<Integer, DrillRpcFutureImpl<?>>(segmentSize, 0.75f, segmentCount);
+    map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount);
   }
 
-  void channelClosed(Exception ex) {
-    for (DrillRpcFutureImpl<?> f : map.values()) {
-      f.setException(ex);
+  void channelClosed(Throwable ex) {
+    if(ex != null){
+      RpcException e;
+      if(ex instanceof RpcException){
+        e = (RpcException) ex;
+      }else{
+        e = new RpcException(ex);  
+      }
+      for (RpcOutcome<?> f : map.values()) {
+        f.setException(e);
+      }
     }
   }
 
-  public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz){
     int i = circularInt.getNext();
-    DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
-    // logger.debug("Writing to map coord {}, future {}", i, future);
+    RpcListener<V> future = new RpcListener<V>(handler, clazz, i);
     Object old = map.put(i, future);
     if (old != null)
       throw new IllegalStateException(
           "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
     return future;
   }
+  
+  private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>{
+    final RpcOutcomeListener<T> handler;
+    final Class<T> clazz;
+    final int coordinationId;
+    
+    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId) {
+      super();
+      this.handler = handler;
+      this.clazz = clazz;
+      this.coordinationId = coordinationId;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if(!future.isSuccess()){
+        removeFromMap(coordinationId);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void set(Object value) {
+      assert clazz.isAssignableFrom(value.getClass());
+      handler.success( (T) value);
+    }
+
+    @Override
+    public void setException(Throwable t) {
+      handler.failed(RpcException.mapException(t));
+    }
+
+    @Override
+    public Class<T> getOutcomeType() {
+      return clazz;
+    }
+
+    @Override
+    public int getCoordinationId() {
+      return coordinationId;
+    }
+    
+    
+  }
+//  
+//  public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+//    int i = circularInt.getNext();
+//    DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
+//    // logger.debug("Writing to map coord {}, future {}", i, future);
+//    Object old = map.put(i, future);
+//    if (old != null)
+//      throw new IllegalStateException(
+//          "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
+//    return future;
+//  }
 
-  private DrillRpcFutureImpl<?> removeFromMap(int coordinationId) {
-    DrillRpcFutureImpl<?> rpc = map.remove(coordinationId);
+  private RpcOutcome<?> removeFromMap(int coordinationId) {
+    RpcOutcome<?> rpc = map.remove(coordinationId);
     if (rpc == null) {
       logger.error("Rpc is null.");
       throw new IllegalStateException(
@@ -62,11 +128,11 @@ public class CoordinationQueue {
     return rpc;
   }
 
-  public <V> DrillRpcFutureImpl<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
+  public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
     // logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
-    DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+    RpcOutcome<?> rpc = removeFromMap(coordinationId);
     // logger.debug("Got rpc from map {}", rpc);
-    Class<?> outcomeClass = rpc.getOutcomeClass();
+    Class<?> outcomeClass = rpc.getOutcomeType();
 
     if (outcomeClass != clazz) {
 
@@ -80,7 +146,7 @@ public class CoordinationQueue {
     }
 
     @SuppressWarnings("unchecked")
-    DrillRpcFutureImpl<V> crpc = (DrillRpcFutureImpl<V>) rpc;
+    RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
 
     // logger.debug("Returning casted future");
     return crpc;
@@ -88,7 +154,7 @@ public class CoordinationQueue {
 
   public void updateFailedFuture(int coordinationId, RpcFailure failure) {
     // logger.debug("Updating failed future.");
-    DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+    RpcOutcome<?> rpc = removeFromMap(coordinationId);
     rpc.setException(new RemoteRpcException(failure));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index bae947a..9033ea1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -21,6 +21,4 @@ import com.google.common.util.concurrent.CheckedFuture;
 
 public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
-
-  public void addLightListener(RpcOutcomeListener<T> outcomeListener);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index ee14eeb..d5d3a9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -22,22 +22,12 @@ import java.util.concurrent.ExecutionException;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 
-class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>{
+class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>, RpcOutcomeListener<V>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
 
-  final int coordinationId;
-  private final Class<V> clazz;
-
-  public DrillRpcFutureImpl(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
-    super(delegate);
-    this.coordinationId = coordinationId;
-    this.clazz = clazz;
-  }
-
-  public Class<V> getOutcomeClass(){
-    return clazz;
+  public DrillRpcFutureImpl() {
+    super(new InnerFuture<V>());
   }
   
   /**
@@ -53,24 +43,7 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
 
   @Override
   protected RpcException mapException(Exception ex) {
-    Throwable e = ex;
-    while(e instanceof ExecutionException){
-      e = e.getCause();
-    }
-    if (e instanceof RpcException)  return (RpcException) e;
-
-    return new RpcException(ex);
-
-  }
-
-  @SuppressWarnings("unchecked")
-  void setValue(Object value) {
-    assert clazz.isAssignableFrom(value.getClass());
-    ((InnerFuture<V>) super.delegate()).setValue((V) value);
-  }
-
-  boolean setException(Throwable t) {
-    return ((InnerFuture<V>) super.delegate()).setException(t);
+    return RpcException.mapException(ex);
   }
 
   public static class InnerFuture<T> extends AbstractFuture<T> {
@@ -85,34 +58,17 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
     }
   }
 
-  public class RpcOutcomeListenerWrapper implements Runnable{
-    final RpcOutcomeListener<V> inner;
-    
-    public RpcOutcomeListenerWrapper(RpcOutcomeListener<V> inner) {
-      super();
-      this.inner = inner;
-    }
-
-    @Override
-    public void run() {
-      try{
-        inner.success(DrillRpcFutureImpl.this.checkedGet());
-      }catch(RpcException e){
-        inner.failed(e);
-      }
-    }
-  }
-  
-  public void addLightListener(RpcOutcomeListener<V> outcomeListener){
-    this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+  @Override
+  public void failed(RpcException ex) {
+    ( (InnerFuture<V>)delegate()).setException(ex);
   }
-  
-  
-  
-  public static <V> DrillRpcFutureImpl<V> getNewFuture(int coordinationId, Class<V> clazz) {
-    InnerFuture<V> f = new InnerFuture<V>();
-    return new DrillRpcFutureImpl<V>(f, coordinationId, clazz);
+
+  @Override
+  public void success(V value) {
+    ( (InnerFuture<V>)delegate()).setValue(value);
   }
 
 
+  
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 11764db..a680a97 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -64,6 +64,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+    DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
+    this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
+    return rpcFuture;
+  }  
+  
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  
+    
+
 
     assert !Arrays.asList(dataBodies).contains(null);
     assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
@@ -72,14 +82,12 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     boolean completed = false;
 
     try {
-      // logger.debug("Seding message");
       Preconditions.checkNotNull(protobufBody);
-      DrillRpcFutureImpl<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
-      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBodies);
+      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
+      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
       ChannelFuture channelFuture = connection.getChannel().write(m);
-      channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
+      channelFuture.addListener(futureListener);
       completed = true;
-      return rpcFuture;
     } finally {
       if (!completed) {
         if (pBuffer != null) pBuffer.release();
@@ -140,10 +148,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       case RESPONSE:
         MessageLite m = getResponseDefaultInstance(msg.rpcType);
         assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
-        DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+        RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
         Parser<?> parser = m.getParserForType();
         Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        rpcFuture.setValue(value);
+        rpcFuture.set(value);
         if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
 
         break;
@@ -162,39 +170,39 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   }
 
-  private class Listener implements GenericFutureListener<ChannelFuture> {
-
-    private int coordinationId;
-    private Class<?> clazz;
-
-    public Listener(int coordinationId, Class<?> clazz) {
-      this.coordinationId = coordinationId;
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture channelFuture) throws Exception {
-      // logger.debug("Completed channel write.");
-
-      if (channelFuture.isCancelled()) {
-        DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
-        rpcFuture.setException(new CancellationException("Socket operation was canceled."));
-      } else if (!channelFuture.isSuccess()) {
-        try {
-          channelFuture.get();
-          throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
-        } catch (Exception e) {
-          logger.error("Error occurred during Rpc", e);
-          DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
-          rpcFuture.setException(e);
-        }
-      } else {
-        // send was successful. No need to modify DrillRpcFuture.
-        return;
-      }
-    }
-
-  }
+//  private class Listener implements GenericFutureListener<ChannelFuture> {
+//
+//    private int coordinationId;
+//    private Class<?> clazz;
+//
+//    public Listener(int coordinationId, Class<?> clazz) {
+//      this.coordinationId = coordinationId;
+//      this.clazz = clazz;
+//    }
+//
+//    @Override
+//    public void operationComplete(ChannelFuture channelFuture) throws Exception {
+//      // logger.debug("Completed channel write.");
+//
+//      if (channelFuture.isCancelled()) {
+//        RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+//        rpcFuture.setException(new CancellationException("Socket operation was canceled."));
+//      } else if (!channelFuture.isSuccess()) {
+//        try {
+//          channelFuture.get();
+//          throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
+//        } catch (Exception e) {
+//          logger.error("Error occurred during Rpc", e);
+//          RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+//          rpcFuture.setException(e);
+//        }
+//      } else {
+//        // send was successful. No need to modify DrillRpcFuture.
+//        return;
+//      }
+//    }
+//
+//  }
 
   public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
new file mode 100644
index 0000000..7c300d3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
+  public RpcCheckedFuture(ListenableFuture<T> delegate) {
+    super(delegate);
+  }
+
+  @Override
+  protected RpcException mapException(Exception e) {
+    return RpcException.mapException(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
new file mode 100644
index 0000000..0f55488
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcConnectionHandler<T extends RemoteConnection> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
+  
+  public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
+  
+  public void connectionSucceeded(T connection);
+  public void connectionFailed(FailureType type, Throwable t);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index ca66481..500f959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import java.util.concurrent.ExecutionException;
+
 import org.apache.drill.common.exceptions.DrillIOException;
 
 /**
@@ -41,5 +43,16 @@ public class RpcException extends DrillIOException{
     super(cause);
   }
   
+  public static RpcException mapException(Throwable t){
+    while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+    if(t instanceof RpcException) return ((RpcException) t);
+    return new RpcException(t);
+  }
+  
+  public static RpcException mapException(String message, Throwable t){
+    while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+    return new RpcException(message, t);
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
new file mode 100644
index 0000000..a25e5e7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcOutcome<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class);
+  
+  public void set(Object value);
+  public void setException(Throwable t);
+  public Class<T> getOutcomeType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index fac908c..771edcf 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -17,11 +17,10 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
-public abstract class RpcOutcomeListener<V> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcomeListener.class);
+public interface RpcOutcomeListener<V> {
   
-  public void failed(RpcException ex){};
-  public void success(V value){};
+  public void failed(RpcException ex);
+  public void success(V value);
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 20a7d7d..318abb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -36,7 +36,7 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
   protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
 
     if(!ctx.channel().isOpen()){
-      logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+      if(in.readableBytes() > 0) logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
       in.skipBytes(in.readableBytes());
       return;
     }


Mime
View raw message