drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/3] Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.
Date Wed, 22 May 2013 01:39:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index 4ba99a1..82a6aa6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -22,57 +22,54 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.rpc.BasicClient;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 
 import com.google.protobuf.MessageLite;
 
-public class BitClient  extends BasicClient<RpcType, BitConnection>{
+public class BitClient  extends BasicClient<RpcType, BitConnection, BitHandshake, BitHandshake>{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
 
   private final BitComHandler handler;
-  private final DrillbitEndpoint endpoint;
-  private BitConnection connection;
-  private final AvailabilityListener openListener;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+  private final DrillbitEndpoint remoteEndpoint;
+  private volatile BitConnection connection;
   private final ListenerPool listeners;
+  private final CloseHandlerCreator closeHandlerFactory;
+  private final DrillbitEndpoint localIdentity;
   
-  public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
-    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
-
-    this.endpoint = endpoint;
+  public BitClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, BitComHandler handler, BootStrapContext context, CloseHandlerCreator closeHandlerFactory, ListenerPool listeners) {
+    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER);
+    this.localIdentity = localEndpoint;
+    this.remoteEndpoint = remoteEndpoint;
     this.handler = handler;
-    this.openListener = openListener;
-    this.registry = registry;
     this.listeners = listeners;
+    this.closeHandlerFactory = closeHandlerFactory;
   }
   
-  public BitHandshake connect() throws RpcException, InterruptedException{
-    BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
-    connection.setEndpoint(endpoint);
-    return bs;
+  public void connect(RpcConnectionHandler<BitConnection> connectionHandler) {
+    connectAsClient(connectionHandler, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getBitPort());
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public BitConnection initRemoteConnection(Channel channel) {
-    this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+    this.connection = new BitConnection(channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, listeners);
     return connection;
   }
 
   @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
-    return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
   }
 
   @Override
@@ -86,18 +83,15 @@ public class BitClient  extends BasicClient<RpcType, BitConnection>{
   }
 
   @Override
-  protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
-    return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
-
-      @Override
-      protected void validateHandshake(BitHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from bit server to bit client. {}", inbound);
-        if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
-      }
+  protected void validateHandshake(BitHandshake handshake) throws RpcException {
+    if(handshake.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", handshake.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+  }
 
-    };
+  @Override
+  protected void finalizeConnection(BitHandshake handshake, BitConnection connection) {
+    connection.setEndpoint(handshake.getEndpoint());
   }
-  
+
   public BitConnection getConnection(){
     return this.connection;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index c60d36b..f7f508e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -40,11 +40,17 @@ public interface BitCom extends Closeable {
    */
   public BitTunnel getTunnel(DrillbitEndpoint node) ;
 
-  public int start() throws InterruptedException, DrillbitStartupException;
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException;
 
   /**
    * Register an incoming batch handler for a local foreman.  
    * @param handler
    */
   public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
+  
+  /**
+   * Get ListenerPool
+   * @return
+   */
+  public ListenerPool getListeners();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index c98be44..d1cadc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -18,157 +18,68 @@
 package org.apache.drill.exec.rpc.bit;
 
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * Manages communication tunnels between nodes.   
+ * Manages communication tunnels between nodes.
  */
 public class BitComImpl implements BitCom {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
 
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
   private final ListenerPool listeners;
   private volatile BitServer server;
   private final BitComHandler handler;
   private final BootStrapContext context;
-  
-  // TODO: this executor should be removed.
-  private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
+  private final ConnectionManagerRegistry connectionRegistry;
 
   public BitComImpl(BootStrapContext context, BitComHandler handler) {
     super();
     this.handler = handler;
     this.context = context;
     this.listeners = new ListenerPool(8);
+    this.connectionRegistry = new ConnectionManagerRegistry(handler, context, listeners);
   }
 
-  public int start() throws InterruptedException, DrillbitStartupException {
-    server = new BitServer(handler, context, registry, listeners);
+  @Override
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+    server = new BitServer(handler, context, connectionRegistry, listeners);
     int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
-    return server.bind(port);
-  }
-
-  private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
-    
-    
-    SettableFuture<BitConnection> future = SettableFuture.create();
-    BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
-    BitConnection t = null;
-
-    if (check) {
-      t = registry.get(endpoint);
-
-      if (t != null) {
-        future.set(t);
-        return checkedFuture;
-      }
-    }
-    
-    try {
-      AvailWatcher watcher = new AvailWatcher(future);
-      BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
-      c.connect();
-      return checkedFuture;
-    } catch (InterruptedException | RpcException e) {
-      future.setException(new FragmentSetupException("Unable to open connection"));
-      return checkedFuture;
-    }
-
+    port = server.bind(port);
+    DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setBitPort(port).build();
+    connectionRegistry.setEndpoint(completeEndpoint);
+    return completeEndpoint;
   }
 
-  private class AvailWatcher implements AvailabilityListener{
-    final SettableFuture<BitConnection> future;
-    
-    public AvailWatcher(SettableFuture<BitConnection> future) {
-      super();
-      this.future = future;
-    }
-
-    @Override
-    public void isAvailable(BitConnection connection) {
-      future.set(connection);
-    }
-    
-  }
   
-  BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
-    BitConnection t = registry.get(endpoint);
-    if(t != null) return t;
-    return this.getNode(endpoint, false).checkedGet();
+   
+  public ListenerPool getListeners() {
+    return listeners;
   }
 
-  
-  CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
-    return this.getNode(endpoint, true);
-  }
-
-  
   @Override
-  public BitTunnel getTunnel(DrillbitEndpoint endpoint){
-    BitConnection t = registry.get(endpoint);
-    if(t == null){
-      return new BitTunnel(exec, endpoint, this, t);
-    }else{
-      return new BitTunnel(exec, endpoint, this,  this.getNode(endpoint, false));
-    }
+  public BitTunnel getTunnel(DrillbitEndpoint endpoint) {
+    return new BitTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
   }
 
-
-  /**
-   * A future which remaps exceptions to a BitComException.
-   * @param <T>
-   */
-  private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
-
-    protected BitComFuture(ListenableFuture<T> delegate) {
-      super(delegate);
-    }
-
-    @Override
-    protected RpcException mapException(Exception e) {
-      Throwable t = e;
-      if(e instanceof ExecutionException){
-        t = e.getCause();
-      }
-      
-      if(t instanceof RpcException) return (RpcException) t;
-      return new RpcException(t);
-    }
+  @Override
+  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+    this.handler.registerIncomingFragmentHandler(handler);
   }
 
   public void close() {
     Closeables.closeQuietly(server);
-    for (BitConnection bt : registry.values()) {
-      bt.shutdownIfClient();
+    for (BitConnectionManager bt : connectionRegistry) {
+      bt.close();
     }
   }
 
-  @Override
-  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
-    this.handler.registerIncomingFragmentHandler(handler);
-  }
-  
-  
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
new file mode 100644
index 0000000..692c63e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+
+import com.google.protobuf.MessageLite;
+
+public interface BitCommand<T extends MessageLite> extends RpcConnectionHandler<BitConnection>{
+
+  public abstract void connectionAvailable(BitConnection connection);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
index 73980f9..f85ea74 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -35,31 +36,35 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
 
 public class BitConnection extends RemoteConnection{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class); 
   
   private final RpcBus<RpcType, BitConnection> bus;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
   private final ListenerPool listeners;
-
-  private final AvailabilityListener listener;
   private volatile DrillbitEndpoint endpoint;
   private volatile boolean active = false;
   private final UUID id;
   
-  public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+  public BitConnection(Channel channel, RpcBus<RpcType, BitConnection> bus, ListenerPool listeners){
     super(channel);
     this.bus = bus;
-    this.registry = registry;
     // we use a local listener pool unless a global one is provided.
     this.listeners = listeners != null ? listeners : new ListenerPool(2);
-    this.listener = listener;
     this.id = UUID.randomUUID();
   }
+  
+  void setEndpoint(DrillbitEndpoint endpoint){
+    assert this.endpoint == null : "Endpoint should only be set once (only in the case in incoming server requests).";
+    this.endpoint = endpoint;
+    active = true;
+  }
 
   protected DrillbitEndpoint getEndpoint() {
     return endpoint;
@@ -69,48 +74,12 @@ public class BitConnection extends RemoteConnection{
     return listeners;
   }
   
-  protected void setEndpoint(DrillbitEndpoint endpoint) {
-    Preconditions.checkNotNull(endpoint);
-    Preconditions.checkArgument(this.endpoint == null);
-    
-    this.endpoint = endpoint;
-    BitServer.logger.debug("Adding new endpoint to available BitServer connections.  Endpoint: {}.", endpoint);
-    synchronized(this){
-      BitConnection c = registry.putIfAbsent(endpoint, this);
-      
-      if(c != null){ // the registry already has a connection like this
-        
-        // give the awaiting future an alternative connection.
-        if(listener != null){
-          listener.isAvailable(c);
-        }
-        
-        // shut this down if this is a client as it won't be available in the registry.
-        // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other.  This shouldn't cause a problem.
-        logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
-        shutdownIfClient();
-        
-      }
-      active = true;
-      if(listener != null) listener.isAvailable(this);
-    }
-  }
-
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
-    return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
-  }
-  
-  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
-    return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
-  }
   
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
-    return bus.send(this,  RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){
+    bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
   }
   
-  public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
-    return bus.send(this,  RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
-  }
 
   public void disable(){
     active = false;
@@ -140,27 +109,7 @@ public class BitConnection extends RemoteConnection{
     return true;
   }
 
-  public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
-    return new CloseHandler(this, parent);
-  }
-  
-  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
-    private BitConnection connection;
-    private GenericFutureListener<ChannelFuture> parent;
-    
-    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
-      super();
-      this.connection = connection;
-      this.parent = parent;
-    }
 
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
-      parent.operationComplete(future);
-    }
-    
-  }
   
   public void shutdownIfClient(){
     if(bus.isClient()) Closeables.closeQuietly(bus);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
index 0160d24..d99bb22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -17,58 +17,152 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
 
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.protobuf.MessageLite;
 
-public class BitConnectionManager {
+/**
+ * Manager all connections between two particular bits.
+ */
+public class BitConnectionManager implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
   
-  private final int maxAttempts;
-  private final BitComImpl com;
   private final DrillbitEndpoint endpoint;
-  private final AtomicReference<BitConnection> connection;
-  private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+  private final AtomicReference<BitConnection> connectionHolder;
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  private final ListenerPool listenerPool;
+  private final DrillbitEndpoint localIdentity;
+  
+  public BitConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity, BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+    assert remoteEndpoint != null : "Endpoint cannot be null.";
+    assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
+    assert remoteEndpoint.getBitPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k.  Was set to %d.", remoteEndpoint.getBitPort());
+    
+    this.connectionHolder =  new AtomicReference<BitConnection>();
+    this.endpoint = remoteEndpoint;
+    this.localIdentity = localIdentity;
+    this.handler = handler;
+    this.context = context;
+    this.listenerPool = listenerPool;
+  }
+  
+  public <R extends MessageLite> BitCommand<R> runCommand(BitCommand<R> cmd){
+    logger.debug("Running command {}", cmd);
+    BitConnection connection = connectionHolder.get();
+    if(connection != null){
+      if(connection.isActive()){
+        cmd.connectionAvailable(connection);
+        return cmd;
+      }else{
+        // remove the old connection. (don't worry if we fail since someone else should have done it.
+        connectionHolder.compareAndSet(connection, null);
+      }
+    }
+    
+    /** We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another endpoint could create a reverse connection **/
+    synchronized(this){
+      connection = connectionHolder.get();
+      if(connection != null){
+        cmd.connectionAvailable(connection);
+      }else{
+        BitClient client = new BitClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator(), listenerPool);
+        
+        client.connect(new ConnectionListeningDecorator(cmd, !endpoint.equals(localIdentity)));
+      }
+      return cmd;
+      
+    }
+  }
+  
+  CloseHandlerCreator getCloseHandlerCreator(){
+    return new CloseHandlerCreator();
+  }
 
-  BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
-    assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
-    this.com = com;
-    this.connection =  new AtomicReference<BitConnection>(connection);
-    this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
-    this.endpoint = endpoint;
-    this.maxAttempts = maxAttempts;
+  /** Factory for close handlers **/
+  class CloseHandlerCreator{
+    public GenericFutureListener<ChannelFuture> getHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent){
+      return new CloseHandler(connection, parent);
+    }
   }
   
-  BitConnection getConnection(int attempt) throws RpcException{
-    BitConnection con = connection.get();
+  
+  
+  /**
+   * Listens for connection closes and clears connection holder.
+   */
+  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+    private BitConnection connection;
+    private GenericFutureListener<ChannelFuture> parent;
     
-    if(con != null){
-      if(con.isActive()) return con;
-      connection.compareAndSet(con, null);
+    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+      super();
+      this.connection = connection;
+      this.parent = parent;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      connectionHolder.compareAndSet(connection, null);
+      parent.operationComplete(future);
     }
     
-    CheckedFuture<BitConnection, RpcException> fut = future.get();
+  } 
+  
+  /**
+   * Decorate a connection creation so that we capture a success and keep it available for future requests.  If we have raced and another is already available... we return that one and close things down on this one.
+   */
+  private class ConnectionListeningDecorator implements RpcConnectionHandler<BitConnection>{
+
+    private final RpcConnectionHandler<BitConnection> delegate;
+    private final boolean closeOnDupe;
+    
+    public ConnectionListeningDecorator(RpcConnectionHandler<BitConnection> delegate,  boolean closeOnDupe) {
+      this.delegate = delegate;
+      this.closeOnDupe = closeOnDupe;
+    }
 
-    if(fut != null){
-      try{
-        return fut.checkedGet();
-      }catch(RpcException ex){
-        future.compareAndSet(fut, null);
-        if(attempt < maxAttempts){
-          return getConnection(attempt + 1);
-        }else{
-          throw ex;
+    @Override
+    public void connectionSucceeded(BitConnection incoming) {
+      BitConnection connection = connectionHolder.get();
+      while(true){
+        boolean setted = connectionHolder.compareAndSet(null, incoming);
+        if(setted){
+          connection = incoming;
+          break;
         }
+        connection = connectionHolder.get();
+        if(connection != null) break; 
+      }
+      
+      
+      if(connection == incoming){
+        delegate.connectionSucceeded(connection);
+      }else{
+
+        if(closeOnDupe){
+          // close the incoming because another channel was created in the mean time (unless this is a self connection).
+          logger.debug("Closing incoming connection because a connection was already set.");
+          incoming.getChannel().close();
+        }
+        delegate.connectionSucceeded(connection);
       }
     }
-    
-    // no checked future, let's make one.
-    fut = com.getConnectionAsync(endpoint);
-    future.compareAndSet(null, fut);
-    return getConnection(attempt);
+
+    @Override
+    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+      delegate.connectionFailed(type, t);
+    }
     
   }
 
@@ -76,5 +170,20 @@ public class BitConnectionManager {
     return endpoint;
   }
   
+  public void addServerConnection(BitConnection connection){
+    // if the connection holder is not set, set it to this incoming connection.
+    logger.debug("Setting server connection.");
+    this.connectionHolder.compareAndSet(null, connection);
+  }
+
+  @Override
+  public void close() {
+    BitConnection c = connectionHolder.getAndSet(null);
+    if(c != null){
+      c.getChannel().close();
+    }
+  }
+  
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index 88ac6cc..d4665a8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -22,18 +22,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 
@@ -43,13 +38,14 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
   
   private final BitComHandler handler;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
   private final ListenerPool listeners;
+  private final ConnectionManagerRegistry connectionRegistry;
+  private volatile ProxyCloseHandler proxyCloseHandler;
   
-  public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+  public BitServer(BitComHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry, ListenerPool listeners) {
     super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
     this.handler = handler;
-    this.registry = registry;
+    this.connectionRegistry = connectionRegistry;
     this.listeners = listeners;
   }
   
@@ -65,23 +61,36 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
 
   @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
-    return connection.getCloseHandler(super.getCloseHandler(connection));
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+    return proxyCloseHandler;
   }
 
   @Override
   public BitConnection initRemoteConnection(Channel channel) {
-    return new BitConnection(null, channel, this, registry, listeners);
+    return new BitConnection(channel, this, listeners);
   }
   
   
   @Override
-  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler(final BitConnection connection) {
     return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
       
       @Override
       public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from other bit. {}", inbound);
+//        logger.debug("Handling handshake from other bit. {}", inbound);
         if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+        if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getBitPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information.  Received %s.", inbound.getEndpoint()));
+        connection.setEndpoint(inbound.getEndpoint());
+
+        // add the 
+        BitConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
+        
+        // update the close handler.
+        proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+        
+        // add to the connection manager. 
+        manager.addServerConnection(connection);
+
         return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
       }
 
@@ -89,5 +98,30 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
   }
 
 
+  private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
+
+    private volatile GenericFutureListener<ChannelFuture>  handler;
+    
+    public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) {
+      super();
+      this.handler = handler;
+    }
+
+
+    public GenericFutureListener<ChannelFuture> getHandler() {
+      return handler;
+    }
+
+
+    public void setHandler(GenericFutureListener<ChannelFuture> handler) {
+      this.handler = handler;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      handler.operationComplete(future);
+    }
+    
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 652fa52..83b7959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,95 +17,79 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 
-/**
- * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
- * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
- * and action. A better approach should be done.
- */
 public class BitTunnel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
 
-  private static final int MAX_ATTEMPTS = 3;
-
   private final BitConnectionManager manager;
-  private final Executor exec;
-  
+  private final DrillbitEndpoint endpoint;
 
-  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
-    this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
-    this.exec = exec;
-  }
-
-  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
-      CheckedFuture<BitConnection, RpcException> future) {
-    this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
-    this.exec = exec;
+  public BitTunnel(DrillbitEndpoint endpoint, BitConnectionManager manager) {
+    this.manager = manager;
+    this.endpoint = endpoint;
   }
   
   public DrillbitEndpoint getEndpoint(){
     return manager.getEndpoint();
   }
 
-  private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
-    exec.execute(command);
-    return command;
-  }
-
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
-    return submit(new SendBatch(batch, context));
+  public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentContext context, FragmentWritableBatch batch) {
+    SendBatch b = new SendBatch(outcomeListener, batch, context);
+    manager.runCommand(b);
   }
 
-  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
-    return submit(new SendFragment(fragment));
+  public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){
+    SendFragment b = new SendFragment(outcomeListener, fragment);
+    manager.runCommand(b);
   }
-
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
-    return submit(new CancelFragment(handle));
+  
+  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+    CancelFragment b = new CancelFragment(handle);
+    manager.runCommand(b);
+    return b.getFuture();
   }
-
+  
   public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
-    return submit(new SendFragmentStatus(status));
+    SendFragmentStatus b = new SendFragmentStatus(status);
+    manager.runCommand(b);
+    return b.getFuture();
   }
 
-  public class SendBatch extends BitCommand<Ack> {
+  public static class SendBatch extends ListeningBitCommand<Ack> {
     final FragmentWritableBatch batch;
     final FragmentContext context;
 
-    public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
-      super();
+    public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch, FragmentContext context) {
+      super(listener);
       this.batch = batch;
       this.context = context;
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      logger.debug("Sending record batch. {}", batch);
-      return connection.sendRecordBatch(context, batch);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
     }
 
+    @Override
+    public String toString() {
+      return "SendBatch [batch.header=" + batch.getHeader() + "]";
+    }
+    
+    
   }
 
-  public class SendFragmentStatus extends BitCommand<Ack> {
+  public static class SendFragmentStatus extends FutureBitCommand<Ack> {
     final FragmentStatus status;
 
     public SendFragmentStatus(FragmentStatus status) {
@@ -114,12 +98,13 @@ public class BitTunnel {
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.sendFragmentStatus(status);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
     }
+
   }
 
-  public class CancelFragment extends BitCommand<Ack> {
+  public static class CancelFragment extends FutureBitCommand<Ack> {
     final FragmentHandle handle;
 
     public CancelFragment(FragmentHandle handle) {
@@ -128,109 +113,23 @@ public class BitTunnel {
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.cancelFragment(handle);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle,  Ack.class);
     }
 
   }
 
-  public class SendFragment extends BitCommand<Ack> {
+  public static class SendFragment extends ListeningBitCommand<Ack> {
     final PlanFragment fragment;
 
-    public SendFragment(PlanFragment fragment) {
-      super();
+    public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment fragment) {
+      super(listener);
       this.fragment = fragment;
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.sendFragment(fragment);
-    }
-
-  }
-
-
-  
-
-  private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
-
-    public void addLightListener(RpcOutcomeListener<T> outcomeListener){
-      this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
-    }
-
-    public BitCommand() {
-      super(SettableFuture.<T> create());
-    }
-
-    public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
-
-    public final void run() {
-      
-      try {
-        
-        BitConnection connection = manager.getConnection(0);
-        assert connection != null : "The connection manager should never return a null connection.  Worse case, it should throw an exception.";
-        CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
-        rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
-      } catch (RpcException ex) {
-        ((SettableFuture<T>) delegate()).setException(ex);
-      }
-
-    }
-
-    @Override
-    protected RpcException mapException(Exception e) {
-      Throwable t = e;
-      if (e instanceof ExecutionException) {
-        t = e.getCause();
-      }
-      if (t instanceof RpcException) return (RpcException) t;
-      return new RpcException(t);
-    }
-
-    public class RpcOutcomeListenerWrapper implements Runnable{
-      final RpcOutcomeListener<T> inner;
-      
-      public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
-        this.inner = inner;
-      }
-
-      @Override
-      public void run() {
-        try{
-          inner.success(BitCommand.this.checkedGet());
-        }catch(RpcException e){
-          inner.failed(e);
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "BitCommand ["+this.getClass().getSimpleName()+"]";
-    }
-    
-    
-    
-  }
-
-  private class FutureBridge<T> implements Runnable {
-    final SettableFuture<T> out;
-    final CheckedFuture<T, RpcException> in;
-
-    public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
-      super();
-      this.out = out;
-      this.in = in;
-    }
-
-    @Override
-    public void run() {
-      try {
-        out.set(in.checkedGet());
-      } catch (RpcException ex) {
-        out.setException(ex);
-      }
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
new file mode 100644
index 0000000..8afbc33
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class ConnectionManagerRegistry implements Iterable<BitConnectionManager>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
+  
+  private final ConcurrentMap<DrillbitEndpoint, BitConnectionManager> registry = Maps.newConcurrentMap();
+  
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  private final ListenerPool listenerPool;
+  private volatile DrillbitEndpoint localEndpoint;
+  
+  public ConnectionManagerRegistry(BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+    super();
+    this.handler = handler;
+    this.context = context;
+    this.listenerPool = listenerPool;
+  }
+
+  public BitConnectionManager getConnectionManager(DrillbitEndpoint endpoint){
+    assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
+    BitConnectionManager m = registry.get(endpoint);
+    if(m == null){
+      m = new BitConnectionManager(endpoint, localEndpoint, handler, context, listenerPool);
+      BitConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+      if(m2 != null) m = m2;
+    }
+    
+    return m;
+  }
+
+  @Override
+  public Iterator<BitConnectionManager> iterator() {
+    return registry.values().iterator();
+  }
+  
+  public void setEndpoint(DrillbitEndpoint endpoint){
+    this.localEndpoint = endpoint;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
new file mode 100644
index 0000000..fa3b518
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class FutureBitCommand<T extends MessageLite> implements BitCommand<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
+
+  protected final SettableFuture<T> settableFuture;
+  private final RpcCheckedFuture<T> parentFuture;
+
+  public FutureBitCommand() {
+    this.settableFuture = SettableFuture.create();
+    this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+  @Override
+  public void connectionAvailable(BitConnection connection) {
+    
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(BitConnection connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      settableFuture.setException(ex);
+    }
+
+    @Override
+    public void success(T value) {
+      settableFuture.set(value);
+    }
+
+  }
+
+  public DrillRpcFuture<T> getFuture() {
+    return parentFuture;
+  }
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    settableFuture.setException(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
index 8f299d2..84dba85 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -22,32 +22,35 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.foreman.FragmentStatusListener;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 
 public class ListenerPool {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
   
-  private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+  private final ConcurrentMap<QueryId, FragmentStatusListener> listeners;
   
   public ListenerPool(int par){
-    listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+    listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(16, 0.75f, par);
   }
   
   public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+    logger.debug("Removing framgent status listener for handle {}.", handle);
     listeners.remove(handle);
   }
   
   public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
-    FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+    logger.debug("Adding framgent status listener for handle {}.", handle);
+    FragmentStatusListener old = listeners.putIfAbsent(handle.getQueryId(), listener);
     if(old != null) throw new RpcException("Failure.  The provided handle already exists in the listener pool.  You need to remove one listener before adding another.");
   }
   
   public void status(FragmentStatus status){
-    FragmentStatusListener l = listeners.get(status.getHandle());
+    FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
     if(l == null){
-      logger.info("A fragment message arrived but there was no registered listener for that message.");
+      
+      logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.", status.getHandle());
       return;
     }else{
       l.statusUpdate(status);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
new file mode 100644
index 0000000..90db6a6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class ListeningBitCommand<T extends MessageLite> implements BitCommand<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningBitCommand.class);
+
+  private final RpcOutcomeListener<T> listener;
+
+  public ListeningBitCommand(RpcOutcomeListener<T> listener) {
+    this.listener = listener;
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+  @Override
+  public void connectionAvailable(BitConnection connection) {
+    
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(BitConnection connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.failed(ex);
+    }
+
+    @Override
+    public void success(T value) {
+      listener.success(value);
+    }
+
+  }
+
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    listener.failed(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 3df88b7..779085c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -45,5 +45,12 @@ public class QueryResultBatch {
   public boolean hasData(){
     return data != null;
   }
+
+  @Override
+  public String toString() {
+    return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+  }
+  
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
new file mode 100644
index 0000000..0aa7c86
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+/**
+ * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is:
+ * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for
+ * query.
+ * 
+ * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo.
+ * 
+ */
+public class QueryResultHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+
+  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
+  
+  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){
+    return new SubmissionListener(listener);
+  }
+  
+  public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
+    final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+    UserResultsListener l = resultsListener.get(result.getQueryId());
+    // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+    if (l != null) {
+      // logger.debug("Results listener available, using existing.");
+      l.resultArrived(batch);
+      if (result.getIsLastChunk()) {
+        resultsListener.remove(result.getQueryId(), l);
+      }
+    } else {
+      logger.debug("Results listener not available, creating a buffering listener.");
+      // manage race condition where we start getting results before we receive the queryid back.
+      BufferingListener bl = new BufferingListener();
+      l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+      if (l != null) {
+        l.resultArrived(batch);
+      } else {
+        bl.resultArrived(batch);
+      }
+    }
+  }
+
+  private class BufferingListener implements UserResultsListener {
+
+    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private volatile UserResultsListener output;
+
+    public boolean transferTo(UserResultsListener l) {
+      synchronized (this) {
+        output = l;
+        boolean last = false;
+        for (QueryResultBatch r : results) {
+          l.resultArrived(r);
+          last = r.getHeader().getIsLastChunk();
+        }
+        return last;
+      }
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      synchronized (this) {
+        if (output == null) {
+          this.results.add(result);
+        } else {
+          output.resultArrived(result);
+        }
+      }
+    }
+
+    @Override
+    public void submissionFailed(RpcException ex) {
+      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+    }
+
+  }
+
+  private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+    private UserResultsListener listener;
+
+    public SubmissionListener(UserResultsListener listener) {
+      super();
+      this.listener = listener;
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.submissionFailed(ex);
+    }
+
+    @Override
+    public void success(QueryId queryId) {
+      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
+      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+      // we need to deal with the situation where we already received results by the time we got the query id back. In
+      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+      // results during the transition
+      if (oldListener != null) {
+        logger.debug("Unable to place user results listener, buffering listener was already in place.");
+        if (oldListener instanceof BufferingListener) {
+          resultsListener.remove(oldListener);
+          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+          // simply remove the buffering listener if we already have the last response.
+          if (all) {
+            resultsListener.remove(oldListener);
+          } else {
+            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+            if (!replaced) throw new IllegalStateException();
+          }
+        } else {
+          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+        }
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5d2e799..ad44ff2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -21,11 +21,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -36,115 +31,27 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import com.google.protobuf.MessageLite;
 
-public class UserClient extends BasicClientWithConnection<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
 
-  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+  private final QueryResultHandler queryResultHandler = new QueryResultHandler();
 
   public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
-  }
-
-  public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
-    this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
-    return resultsListener.getFuture();
+    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
   }
 
-  public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
-    return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
-  }
-  
-  private class BufferingListener extends UserResultsListener {
-
-    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
-    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private volatile UserResultsListener output;
-
-    public boolean transferTo(UserResultsListener l) {
-      lock.writeLock().lock();
-      output = l;
-      boolean last = false;
-      for (QueryResultBatch r : results) {
-        l.resultArrived(r);
-        last = r.getHeader().getIsLastChunk();
-      }
-      if (future.isDone()) {
-        l.set();
-      }
-      return last;
-    }
-
-    @Override
-    public void resultArrived(QueryResultBatch result) {
-      logger.debug("Result arrvied.");
-      lock.readLock().lock();
-      try {
-        if (output == null) {
-          this.results.add(result);
-        } else {
-          output.resultArrived(result);
-        }
-
-      } finally {
-        lock.readLock().unlock();
-      }
-
-    }
-
-    @Override
-    public void submissionFailed(RpcException ex) {
-      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
-    }
-
+  public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
+    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
   }
 
-  private class SubmissionListener extends RpcOutcomeListener<QueryId> {
-    private UserResultsListener listener;
-
-    public SubmissionListener(UserResultsListener listener) {
-      super();
-      this.listener = listener;
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      listener.submissionFailed(ex);
-    }
-
-    @Override
-    public void success(QueryId queryId) {
-      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
-      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
-
-      // we need to deal with the situation where we already received results by the time we got the query id back. In
-      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
-      // results during the transition
-      if (oldListener != null) {
-        logger.debug("Unable to place user results listener, buffering listener was already in place.");
-        if (oldListener instanceof BufferingListener) {
-          resultsListener.remove(oldListener);
-          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
-          // simply remove the buffering listener if we already have the last response.
-          if (all) {
-            resultsListener.remove(oldListener);
-          } else {
-            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
-            if (!replaced) throw new IllegalStateException();
-          }
-        } else {
-          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
-        }
-      }
-
-    }
-
+  public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
+    UserToBitHandshake hs = UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build();
+    this.connectAsClient(handler, hs, endpoint.getAddress(), endpoint.getUserPort());
   }
 
   @Override
@@ -165,29 +72,7 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
   protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
     switch (rpcType) {
     case RpcType.QUERY_RESULT_VALUE:
-      final QueryResult result = get(pBody, QueryResult.PARSER);
-      final QueryResultBatch batch = new QueryResultBatch(result, dBody);
-      UserResultsListener l = resultsListener.get(result.getQueryId());
-//      logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
-      if (l != null) {
-//        logger.debug("Results listener available, using existing.");
-        l.resultArrived(batch);
-        if (result.getIsLastChunk()) {
-          resultsListener.remove(result.getQueryId(), l);
-          l.set();
-        }
-      } else {
-        logger.debug("Results listener not available, creating a buffering listener.");
-        // manage race condition where we start getting results before we receive the queryid back.
-        BufferingListener bl = new BufferingListener();
-        l = resultsListener.putIfAbsent(result.getQueryId(), bl);
-        if (l != null) {
-          l.resultArrived(batch);
-        } else {
-          bl.resultArrived(batch);
-        }
-      }
-
+      queryResultHandler.batchArrived(pBody, dBody);
       return new Response(RpcType.ACK, Ack.getDefaultInstance());
     default:
       throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
@@ -196,18 +81,16 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
   }
 
   @Override
-  protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
-    return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+  protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+    logger.debug("Handling handshake from bit to user. {}", inbound);
+    if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+      throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(),
+          UserRpcConfig.RPC_VERSION));
 
-      @Override
-      protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from bit to user. {}", inbound);
-        if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
-          throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.",
-              inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
-      }
+  }
 
-    };
+  @Override
+  protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3ce14f0..b1dbfe8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,17 +24,8 @@ import org.apache.drill.exec.rpc.RpcException;
 
 import com.google.common.util.concurrent.SettableFuture;
 
-public abstract class UserResultsListener {
-  SettableFuture<Void> future = SettableFuture.create();
+public interface UserResultsListener {
   
-  final void set(){
-    future.set(null);
-  }
-  
-  Future<Void> getFuture(){
-    return future;
-  }
-
   public abstract void submissionFailed(RpcException ex); 
   public abstract void resultArrived(QueryResultBatch result);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 406afc4..908af61 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -26,16 +26,15 @@ import io.netty.channel.EventLoopGroup;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.work.user.UserWorker;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -100,8 +99,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       super(channel);
     }
 
-    public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
-      return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+    public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+      logger.debug("Sending result to client with {}", result);
+      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
     }
 
   }
@@ -112,7 +112,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
   
   @Override
-  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(UserClientConnection connection) {
     return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3c4d9af..ed13748 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{
   public BootStrapContext(DrillConfig config) {
     super();
     this.config = config;
-    this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+    this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
     this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
     this.allocator = BufferAllocator.getAllocator(config);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 0337a68..199768f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.LocalCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.LocalClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 
 public class RemoteServiceSet implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -37,6 +38,7 @@ public class RemoteServiceSet implements Closeable{
     this.coordinator = coordinator;
   }
 
+
   public DistributedCache getCache() {
     return cache;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d6d3b9c..b07f274 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -53,12 +53,12 @@ public class ServiceEngine implements Closeable{
   
   public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
     int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
-    int bitPort = bitCom.start();
-    return DrillbitEndpoint.newBuilder()
+    DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
         .setAddress(InetAddress.getLocalHost().getHostAddress())
-        .setBitPort(bitPort)
         .setUserPort(userPort)
         .build();
+
+    return bitCom.start(partialEndpoint);
   }
 
   public BitCom getBitCom(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
index f6a9786..9a72845 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -18,10 +18,9 @@
 package org.apache.drill.exec.work;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 
-public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+public abstract class EndpointListener<RET, V> extends BaseRpcOutcomeListener<RET>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
 
   protected final DrillbitEndpoint endpoint;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
index 2900d99..554b398 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -65,6 +65,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
   
   @Override
   public void run() {
+    logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
     if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
       internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
       return;
@@ -76,7 +77,12 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
     try{
       while(state.get() == FragmentState.RUNNING_VALUE){
         if(!root.next()){
-          updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+          if(context.isFailed()){
+            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);  
+          }else{
+            updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+          }
+          
         }
       }
       
@@ -90,7 +96,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
     }finally{
       t.stop();
     }
-    
+    logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
   }
   
   private void internalFail(Throwable excep){


Mime
View raw message