drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [05/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 01:52:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index bc942ac..52bb0a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -17,9 +17,9 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
-import com.google.protobuf.Internal.EnumLite;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -27,23 +27,32 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
-import org.apache.drill.exec.exception.DrillbitStartupException;
 
 import java.io.IOException;
 import java.net.BindException;
 
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
 /**
  * A server is bound to a port and is responsible for responding to various type of requests. In some cases, the inbound
  * requests will generate more than one outbound request.
  */
-public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
+public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
 
   private ServerBootstrap b;
   private volatile boolean connect = false;
+  private final EventLoopGroup eventLoopGroup;
 
-  public BasicServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-
+  public BasicServer(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+    super(rpcMapping);
+    this.eventLoopGroup = eventLoopGroup;
+    
     b = new ServerBootstrap() //
         .channel(NioServerSocketChannel.class) //
         .option(ChannelOption.SO_BACKLOG, 100) //
@@ -56,17 +65,19 @@ public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             
-            ch.closeFuture().addListener(getCloseHandler(ch));
+            C connection = initRemoteConnection(ch);
+            ch.closeFuture().addListener(getCloseHandler(connection));
 
             ch.pipeline().addLast( //
                 new ZeroCopyProtobufLengthDecoder(), //
-                new RpcDecoder(), //
-                new RpcEncoder(), //
-                new InboundHandler(ch), //
+                new RpcDecoder(rpcConfig.getName()), //
+                new RpcEncoder(rpcConfig.getName()), //
+                getHandshakeHandler(),
+                new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );            
-            channel = ch;
             connect = true;
+            
           }
         });
   }
@@ -76,12 +87,34 @@ public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
     return false;
   }
 
+  
+  protected abstract ServerHandshakeHandler<?> getHandshakeHandler();
 
+  protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
+
+    public ServerHandshakeHandler(EnumLite handshakeType, Parser<T> parser) {
+      super(handshakeType, parser);
+    }
+
+    @Override
+    protected final void consumeHandshake(Channel c, T inbound) throws Exception {
+      OutboundRpcMessage msg = new OutboundRpcMessage(RpcMode.RESPONSE, this.handshakeType, coordinationId, getHandshakeResponse(inbound));
+      c.write(msg);
+    }
+    
+    public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
+    
+
+      
+    
+  }
+  
+  
   public int bind(final int initialPort) throws InterruptedException, DrillbitStartupException{
-    int port = initialPort;
+    int port = initialPort-1;
     while (true) {
       try {
-        b.bind(port++).sync();
+        b.bind(++port).sync();
         break;
       } catch (Exception e) {
         if (e instanceof BindException)
@@ -89,13 +122,15 @@ public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
         throw new DrillbitStartupException("Could not bind Drillbit", e);
       }
     }
+    
     connect = !connect;
+    logger.debug("Server started on port {} of type {} ", port, this.getClass().getSimpleName());
     return port;    
   }
 
   @Override
   public void close() throws IOException {
-    if(b != null) b.shutdown();
+    eventLoopGroup.shutdownGracefully();
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index c796e2d..70142bb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -29,21 +29,21 @@ public class CoordinationQueue {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
 
   private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
-  private final Map<Integer, DrillRpcFuture<?>> map;
+  private final Map<Integer, DrillRpcFutureImpl<?>> map;
 
   public CoordinationQueue(int segmentSize, int segmentCount) {
-    map = new ConcurrentHashMap<Integer, DrillRpcFuture<?>>(segmentSize, 0.75f, segmentCount);
+    map = new ConcurrentHashMap<Integer, DrillRpcFutureImpl<?>>(segmentSize, 0.75f, segmentCount);
   }
 
   void channelClosed(Exception ex) {
-    for (DrillRpcFuture<?> f : map.values()) {
+    for (DrillRpcFutureImpl<?> f : map.values()) {
       f.setException(ex);
     }
   }
 
-  public <V> DrillRpcFuture<V> getNewFuture(Class<V> clazz) {
+  public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
     int i = circularInt.getNext();
-    DrillRpcFuture<V> future = DrillRpcFuture.getNewFuture(i, clazz);
+    DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
     // logger.debug("Writing to map coord {}, future {}", i, future);
     Object old = map.put(i, future);
     if (old != null)
@@ -52,8 +52,8 @@ public class CoordinationQueue {
     return future;
   }
 
-  private DrillRpcFuture<?> removeFromMap(int coordinationId) {
-    DrillRpcFuture<?> rpc = map.remove(coordinationId);
+  private DrillRpcFutureImpl<?> removeFromMap(int coordinationId) {
+    DrillRpcFutureImpl<?> rpc = map.remove(coordinationId);
     if (rpc == null) {
       logger.error("Rpc is null.");
       throw new IllegalStateException(
@@ -62,23 +62,25 @@ public class CoordinationQueue {
     return rpc;
   }
 
-  public <V> DrillRpcFuture<V> getFuture(int coordinationId, Class<V> clazz) {
+  public <V> DrillRpcFutureImpl<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
     // logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
-    DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
+    DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
     // logger.debug("Got rpc from map {}", rpc);
     Class<?> outcomeClass = rpc.getOutcomeClass();
+
     if (outcomeClass != clazz) {
-      logger.error("Rpc class is not expected class. Original: {}, requested: {}", outcomeClass.getCanonicalName(), clazz.getCanonicalName());
+
       throw new IllegalStateException(
           String
               .format(
-                  "You attempted to request a future for a coordination id that has a different value class than was used when you "
-                      + "initially created the coordination id.  Requested class %s, originally expected class %s.  This shouldn't happen.  ",
-                  clazz.getCanonicalName(), outcomeClass.getCanonicalName()));
+                  "RPC Engine had a submission and response configuration mismatch.  The RPC request that you submitted was defined with an expected response type of %s.  However, "
+                      + "when the response returned, a call to getResponseDefaultInstance() with Rpc number %d provided an expected class of %s.  This means either your submission uses the wrong type definition"
+                      + "or your getResponseDefaultInstance() method responds the wrong instance type ",
+                  clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
     }
 
     @SuppressWarnings("unchecked")
-    DrillRpcFuture<V> crpc = (DrillRpcFuture<V>) rpc;
+    DrillRpcFutureImpl<V> crpc = (DrillRpcFutureImpl<V>) rpc;
 
     // logger.debug("Returning casted future");
     return crpc;
@@ -86,7 +88,7 @@ public class CoordinationQueue {
 
   public void updateFailedFuture(int coordinationId, RpcFailure failure) {
     // logger.debug("Updating failed future.");
-    DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
+    DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
     rpc.setException(new RemoteRpcException(failure));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index 9a4a7f7..bae947a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -17,80 +17,10 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
-import java.util.concurrent.ExecutionException;
+import com.google.common.util.concurrent.CheckedFuture;
 
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class DrillRpcFuture<V> extends AbstractCheckedFuture<V, RpcException> {
+public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
 
-  final int coordinationId;
-  private final Class<V> clazz;
-
-  public DrillRpcFuture(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
-    super(delegate);
-    this.coordinationId = coordinationId;
-    this.clazz = clazz;
-  }
-
-  public Class<V> getOutcomeClass(){
-    return clazz;
-  }
-  
-  /**
-   * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
-   * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
-   * will result in an UnsupportedOperationException.
-   */
-  @Override
-  public boolean cancel(boolean mayInterruptIfRunning) {
-    throw new UnsupportedOperationException(
-        "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
-  }
-
-  @Override
-  protected RpcException mapException(Exception ex) {
-    if (ex instanceof RpcException)  return (RpcException) ex;
-    
-    if (ex instanceof ExecutionException) {
-      Throwable e2 = ex.getCause();
-      
-      if (e2 instanceof RpcException) {
-        return (RpcException) e2;
-      }
-    }
-    return new RpcException(ex);
-
-  }
-
-  @SuppressWarnings("unchecked")
-  void setValue(Object value) {
-    assert clazz.isAssignableFrom(value.getClass());
-    ((InnerFuture<V>) super.delegate()).setValue((V) value);
-  }
-
-  boolean setException(Throwable t) {
-    return ((InnerFuture<V>) super.delegate()).setException(t);
-  }
-
-  public static class InnerFuture<T> extends AbstractFuture<T> {
-    // we rewrite these so that the parent can see them
-
-    void setValue(T value) {
-      super.set(value);
-    }
-
-    protected boolean setException(Throwable t) {
-      return super.setException(t);
-    }
-  }
-
-  public static <V> DrillRpcFuture<V> getNewFuture(int coordinationId, Class<V> clazz) {
-    InnerFuture<V> f = new InnerFuture<V>();
-    return new DrillRpcFuture<V>(f, coordinationId, clazz);
-  }
-
-
-}
\ No newline at end of file
+  public void addLightListener(RpcOutcomeListener<T> outcomeListener);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
new file mode 100644
index 0000000..ee14eeb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -0,0 +1,118 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
+
+  final int coordinationId;
+  private final Class<V> clazz;
+
+  public DrillRpcFutureImpl(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
+    super(delegate);
+    this.coordinationId = coordinationId;
+    this.clazz = clazz;
+  }
+
+  public Class<V> getOutcomeClass(){
+    return clazz;
+  }
+  
+  /**
+   * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
+   * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
+   * will result in an UnsupportedOperationException.
+   */
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    throw new UnsupportedOperationException(
+        "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
+  }
+
+  @Override
+  protected RpcException mapException(Exception ex) {
+    Throwable e = ex;
+    while(e instanceof ExecutionException){
+      e = e.getCause();
+    }
+    if (e instanceof RpcException)  return (RpcException) e;
+
+    return new RpcException(ex);
+
+  }
+
+  @SuppressWarnings("unchecked")
+  void setValue(Object value) {
+    assert clazz.isAssignableFrom(value.getClass());
+    ((InnerFuture<V>) super.delegate()).setValue((V) value);
+  }
+
+  boolean setException(Throwable t) {
+    return ((InnerFuture<V>) super.delegate()).setException(t);
+  }
+
+  public static class InnerFuture<T> extends AbstractFuture<T> {
+    // we rewrite these so that the parent can see them
+
+    void setValue(T value) {
+      super.set(value);
+    }
+
+    protected boolean setException(Throwable t) {
+      return super.setException(t);
+    }
+  }
+
+  public class RpcOutcomeListenerWrapper implements Runnable{
+    final RpcOutcomeListener<V> inner;
+    
+    public RpcOutcomeListenerWrapper(RpcOutcomeListener<V> inner) {
+      super();
+      this.inner = inner;
+    }
+
+    @Override
+    public void run() {
+      try{
+        inner.success(DrillRpcFutureImpl.this.checkedGet());
+      }catch(RpcException e){
+        inner.failed(e);
+      }
+    }
+  }
+  
+  public void addLightListener(RpcOutcomeListener<V> outcomeListener){
+    this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+  }
+  
+  
+  
+  public static <V> DrillRpcFutureImpl<V> getNewFuture(int coordinationId, Class<V> clazz) {
+    InnerFuture<V> f = new InnerFuture<V>();
+    return new DrillRpcFutureImpl<V>(f, coordinationId, clazz);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
index ab977db..be1ff6b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
@@ -18,15 +18,20 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+import java.io.InputStream;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 
 public class InboundRpcMessage extends RpcMessage{
   public ByteBuf pBody;
+  public ByteBuf dBody;
   
   public InboundRpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
-    super(mode, rpcType, coordinationId, dBody);
+    super(mode, rpcType, coordinationId);
     this.pBody = pBody;
+    this.dBody = dBody;
   }
   
   public int getBodySize(){
@@ -37,7 +42,7 @@ public class InboundRpcMessage extends RpcMessage{
   
   void release(){
     pBody.release();
-    super.release();
+    if(dBody != null) dBody.release();
   }
 
   @Override
@@ -46,5 +51,7 @@ public class InboundRpcMessage extends RpcMessage{
         + coordinationId + ", dBody=" + dBody + "]";
   }
   
-  
+  public InputStream getProtobufBodyAsIS(){
+    return new ByteBufInputStream(pBody);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
index 91c3d45..e4858c4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import java.util.Arrays;
+
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
@@ -24,28 +26,49 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 
-class OutboundRpcMessage extends RpcMessage{
+public class OutboundRpcMessage extends RpcMessage {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundRpcMessage.class);
 
   final MessageLite pBody;
-  
-  public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf dBody) {
-    super(mode, rpcType.getNumber(), coordinationId, dBody);
+  public ByteBuf[] dBodies;
+
+  public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
+    super(mode, rpcType.getNumber(), coordinationId);
     this.pBody = pBody;
+    this.dBodies = dBodies;
   }
-  
-  public int getBodySize(){
+
+  public int getBodySize() {
     int len = pBody.getSerializedSize();
     len += RpcEncoder.getRawVarintSize(len);
-    if(dBody != null) len += dBody.capacity();
+    len += getRawBodySize();
     return len;
   }
 
+  public int getRawBodySize(){
+    if(dBodies == null) return 0;
+    int len = 0;
+    
+    for (int i = 0; i < dBodies.length; i++) {
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reader Index {}, Writer Index {}", dBodies[i].readerIndex(), dBodies[i].writerIndex());
+      len += dBodies[i].readableBytes();
+    }
+    return len;
+  }
+  
   @Override
   public String toString() {
     return "OutboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
-        + coordinationId + ", dBody=" + dBody + "]";
+        + coordinationId + ", dBodies=" + Arrays.toString(dBodies) + "]";
   }
-
   
+  void release(){
+    if(dBodies != null){
+      for(ByteBuf b : dBodies){
+        b.release();
+      }
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
new file mode 100644
index 0000000..cedba10
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.Channel;
+
+public class RemoteConnection{
+  private final Channel channel;
+  
+  public RemoteConnection(Channel channel) {
+    super();
+    this.channel = channel;
+  }
+
+
+  public final Channel getChannel() {
+    return channel;
+  }
+
+
+  public ConnectionThrottle getConnectionThrottle(){
+    // can't be implemented until we switch to per query sockets.
+    return null;
+  }
+  
+  public interface ConnectionThrottle{
+    public void disableReceiving();
+    public void enableReceiving();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
index 4bd592b..0c4ab7a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+
 import io.netty.buffer.ByteBuf;
 
 import com.google.protobuf.Internal.EnumLite;
@@ -27,13 +29,13 @@ public class Response {
   
   public EnumLite rpcType;
   public MessageLite pBody;
-  public ByteBuf dBody;
+  public ByteBuf[] dBodies;
   
-  public Response(EnumLite rpcType, MessageLite pBody, ByteBuf dBody) {
+  public Response(EnumLite rpcType, MessageLite pBody, ByteBuf... dBodies) {
     super();
     this.rpcType = rpcType;
     this.pBody = pBody;
-    this.dBody = dBody;
+    this.dBodies = dBodies;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 76300d1..11764db 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -23,113 +23,140 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundMessageHandlerAdapter;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
+import java.util.Arrays;
 import java.util.concurrent.CancellationException;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 /**
- * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system.
+ * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
+ * system.
+ * 
  * @param <T>
  */
-public abstract class RpcBus<T extends EnumLite> implements Closeable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcBus.class);
-  
-  private CoordinationQueue queue = new CoordinationQueue(16, 16);
-  protected Channel channel;
+public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+  protected final CoordinationQueue queue = new CoordinationQueue(16, 16);
 
   protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
-  protected abstract Response handle(SocketChannel channel, int RpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
-  public abstract boolean isClient(); 
 
-  
-  protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
-      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf dataBody) throws RpcException {
+  protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
+
+  public abstract boolean isClient();
+
+  protected final RpcConfig rpcConfig;
+
+  public RpcBus(RpcConfig rpcConfig) {
+    this.rpcConfig = rpcConfig;
+  }
+
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+
+    assert !Arrays.asList(dataBodies).contains(null);
+    assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
+
     ByteBuf pBuffer = null;
     boolean completed = false;
 
     try {
-//      logger.debug("Seding message");
+      // logger.debug("Seding message");
       Preconditions.checkNotNull(protobufBody);
-      DrillRpcFuture<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
-      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBody);
-      ChannelFuture channelFuture = channel.write(m);
+      DrillRpcFutureImpl<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
+      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBodies);
+      ChannelFuture channelFuture = connection.getChannel().write(m);
       channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
       completed = true;
       return rpcFuture;
     } finally {
       if (!completed) {
         if (pBuffer != null) pBuffer.release();
-        if (dataBody != null) dataBody.release();
+        if (dataBodies != null) {
+          for (ByteBuf b : dataBodies) {
+            b.release();
+          }
+
+        }
       }
       ;
     }
   }
 
-  
-  public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture>{
+  public abstract C initRemoteConnection(Channel channel);
+
+  public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel().remoteAddress());
-      queue.channelClosed(new ChannelClosedException());
+      logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel()
+          .remoteAddress());
+      closeQueueDueToChannelClose();
     }
   }
-  
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch){
+
+  protected void closeQueueDueToChannelClose() {
+    if (this.isClient()) queue.channelClosed(new ChannelClosedException());
+  }
+
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
     return new ChannelClosedHandler();
   }
-  
+
   protected class InboundHandler extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
 
-    private final SocketChannel channel;
-    
-    
-    public InboundHandler(SocketChannel channel) {
+    private final C connection;
+    public InboundHandler(C connection) {
       super();
-      this.channel = channel;
+      this.connection = connection;
     }
 
-
     @Override
     public void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage msg) throws Exception {
-      if(!ctx.channel().isOpen()) return;
+      if (!ctx.channel().isOpen()) return;
 
-      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
-      switch(msg.mode){
+      if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
+      switch (msg.mode) {
       case REQUEST:
         // handle message and ack.
-        Response r = handle(channel, msg.rpcType, msg.pBody, msg.dBody);
-        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId, r.pBody, r.dBody);
-        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
+        Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody);
+        assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
+        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId,
+            r.pBody, r.dBodies);
+        if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
         ctx.write(outMessage);
         break;
-        
+
       case RESPONSE:
         MessageLite m = getResponseDefaultInstance(msg.rpcType);
-        DrillRpcFuture<?> rpcFuture = queue.getFuture(msg.coordinationId, m.getClass());
+        assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
+        DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
         Parser<?> parser = m.getParserForType();
         Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
         rpcFuture.setValue(value);
-        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+        if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+
         break;
-        
+
       case RESPONSE_FAILURE:
         RpcFailure failure = RpcFailure.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
         queue.updateFailedFuture(msg.coordinationId, failure);
-        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+        if (RpcConstants.EXTRA_DEBUGGING)
+          logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
         break;
-        
+
       default:
-        throw new UnsupportedOperationException(); 
+        throw new UnsupportedOperationException();
       }
     }
 
@@ -147,18 +174,18 @@ public abstract class RpcBus<T extends EnumLite> implements Closeable{
 
     @Override
     public void operationComplete(ChannelFuture channelFuture) throws Exception {
-//      logger.debug("Completed channel write.");
-      
+      // logger.debug("Completed channel write.");
+
       if (channelFuture.isCancelled()) {
-        DrillRpcFuture<?> rpcFuture = queue.getFuture(coordinationId, clazz);
+        DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
         rpcFuture.setException(new CancellationException("Socket operation was canceled."));
       } else if (!channelFuture.isSuccess()) {
         try {
           channelFuture.get();
-          throw new IllegalStateException(
-              "Future was described as completed and not succesful but did not throw an exception.");
+          throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
         } catch (Exception e) {
-          DrillRpcFuture<?> rpcFuture = queue.getFuture(coordinationId, clazz);
+          logger.error("Error occurred during Rpc", e);
+          DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
           rpcFuture.setException(e);
         }
       } else {
@@ -168,6 +195,13 @@ public abstract class RpcBus<T extends EnumLite> implements Closeable{
     }
 
   }
-  
-  
+
+  public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
+    try {
+      ByteBufInputStream is = new ByteBufInputStream(pBody);
+      return parser.parseFrom(is);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RpcException(String.format("Failure while decoding message with parser of type. %s", parser.getClass().getCanonicalName()), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
new file mode 100644
index 0000000..c6b4c49
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -0,0 +1,150 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+
+public class RpcConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
+
+  private final String name;
+  private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
+  private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
+  
+  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap){
+    this.name = name;
+    this.sendMap = ImmutableMap.copyOf(sendMap);
+    this.receiveMap = ImmutableMap.copyOf(receiveMap);
+  }
+  
+  public String getName() {
+    return name;
+  }
+
+  public boolean checkReceive(int rpcType, Class<?> receiveClass){
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
+    RpcMessageType<?,?,?> type = receiveMap.get(rpcType);
+    if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc receive type number of %s.", name, rpcType));
+
+    if(receiveClass != type.getRet()){
+      throw new IllegalStateException(String.format("%s: The definition for receive doesn't match implementation code.  The definition is %s however the current receive for this type was of type %s.", name, type, receiveClass.getCanonicalName()));
+    }
+    return true;
+  }
+  
+  public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass){
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking send classes for send RpcType %s.  Send Class is %s and Receive class is %s.", send, sendClass, receiveClass));
+    RpcMessageType<?,?,?> type = sendMap.get(send);
+    if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc send type of %s.", name, send));
+
+    if(type.getSend() != sendClass) throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code.  The definition is %s however the current send is trying to send an object of type %s.", name, type, sendClass.getCanonicalName()));
+    if(type.getRet() != receiveClass) throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code.  The definition is %s however the current send is trying to setup an expected reception of an object of type %s.", name, type, receiveClass.getCanonicalName()));
+
+    return true;
+  }
+  
+  public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass){
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking responce send of type %s with response class of %s.",  responseType, responseClass));
+    RpcMessageType<?,?,?> type = receiveMap.get(responseType.getNumber());
+    if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType));
+    if(type.getRet() != responseClass) throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code.  The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName()));
+    
+    return true;
+  }
+  
+  public static class RpcMessageType<SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>{
+    private T sendEnum;
+    private Class<SEND> send;
+    private T receiveEnum;
+    private Class<RECEIVE> ret;
+    public RpcMessageType(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> ret) {
+      super();
+      this.sendEnum = sendEnum;
+      this.send = send;
+      this.receiveEnum = receiveEnum;
+      this.ret = ret;
+    }
+    public Class<SEND> getSend() {
+      return send;
+    }
+    public void setSend(Class<SEND> send) {
+      this.send = send;
+    }
+    public T getSendEnum() {
+      return sendEnum;
+    }
+    public void setSendEnum(T sendEnum) {
+      this.sendEnum = sendEnum;
+    }
+    public Class<RECEIVE> getRet() {
+      return ret;
+    }
+    public void setRet(Class<RECEIVE> ret) {
+      this.ret = ret;
+    }
+    public T getReceiveEnum() {
+      return receiveEnum;
+    }
+    public void setReceiveEnum(T receiveEnum) {
+      this.receiveEnum = receiveEnum;
+    }
+    @Override
+    public String toString() {
+      return "RpcMessageType [sendEnum=" + sendEnum + ", send=" + send + ", receiveEnum=" + receiveEnum + ", ret="
+          + ret + "]";
+    }
+    
+    
+  }
+
+  public static RpcConfigBuilder newBuilder(String name){
+    return new RpcConfigBuilder(name);
+  }
+  
+  public static class RpcConfigBuilder {
+    private final String name;
+    private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
+    private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();  
+    
+    private RpcConfigBuilder(String name){
+      this.name = name;
+    }
+    
+    public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>  RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec){
+      RpcMessageType<SEND, RECEIVE, T> type = new RpcMessageType<SEND, RECEIVE, T>(sendEnum, send, receiveEnum, rec);
+      this.sendMap.put(sendEnum, type);
+      this.receiveMap.put(receiveEnum.getNumber(), type);
+      return this;
+    }
+    
+    public RpcConfig build(){
+      return new RpcConfig(name, sendMap, receiveMap);
+
+    }
+  }
+  
+  
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
index 134e54b..4e9714b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.MessageBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.CorruptedFrameException;
 import io.netty.handler.codec.MessageToMessageDecoder;
@@ -32,14 +33,19 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
  * Converts a previously length adjusted buffer into an RpcMessage.
  */
 class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class);
+  final org.slf4j.Logger logger;
   
   private final AtomicLong messageCounter = new AtomicLong();
   
+  public RpcDecoder(String name){
+    this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "." + name);
+  }
+
+  
   @Override
-  protected InboundRpcMessage decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
     if(!ctx.channel().isOpen()){
-      return null;
+      return;
     }
     
     if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Inbound rpc message received.");
@@ -94,7 +100,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
     buffer.skipBytes(dBodyLength);
     messageCounter.incrementAndGet();
     if (RpcConstants.EXTRA_DEBUGGING) logger.trace("Inbound Rpc Message Decoded {}.", m);
-    return m;
+    out.add(m);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index 8d3d97c..f76d648 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -34,7 +34,7 @@ import com.google.protobuf.WireFormat;
  * Converts an RPCMessage into wire format.
  */
 class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class);
+  final org.slf4j.Logger logger;
   
   static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
   static final int PROTOBUF_BODY_TAG = makeTag(CompleteRpcMessage.PROTOBUF_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
@@ -43,6 +43,9 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
   static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
   static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
   
+  public RpcEncoder(String name){
+    this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "." + name);
+  }
   
   @Override
   public void flush(ChannelHandlerContext ctx, OutboundRpcMessage msg) throws Exception {
@@ -61,7 +64,7 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
       // figure out the full length
       int headerLength = header.getSerializedSize();
       int protoBodyLength = msg.pBody.getSerializedSize();
-      int rawBodyLength = msg.dBody == null ? 0 : msg.dBody.readableBytes();
+      int rawBodyLength = msg.getRawBodySize();
       int fullLength = //
           HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength +   //
           PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; //
@@ -89,11 +92,15 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
       msg.pBody.writeTo(cos);
 
       // if exists, write data body and tag.
-      if(msg.dBody != null && msg.dBody.readableBytes() > 0){
+      // TODO: is it possible to avoid this copy, i think so...
+      if(msg.getRawBodySize() > 0){
+        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Writing raw body of size {}", msg.getRawBodySize());
         cos.writeRawVarint32(RAW_BODY_TAG);
         cos.writeRawVarint32(rawBodyLength);
         cos.flush(); // need to flush so that dbody goes after if cos is caching.
-        buf.writeBytes(msg.dBody);
+        for(int i =0; i < msg.dBodies.length; i++){
+          buf.writeBytes(msg.dBodies[i]);  
+        }
       }else{
         cos.flush();
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index ef1b88f..a0aed94 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -25,28 +25,23 @@ public class RpcExceptionHandler implements ChannelHandler{
   
   public RpcExceptionHandler(){
   }
-  
-  @Override
-  public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
-  }
 
-  @Override
-  public void afterAdd(ChannelHandlerContext ctx) throws Exception {
-  }
 
   @Override
-  public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+    if(!ctx.channel().isOpen()) return;
+    logger.info("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+    ctx.close();
   }
 
+
   @Override
-  public void afterRemove(ChannelHandlerContext ctx) throws Exception {
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   }
 
+
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-    if(!ctx.channel().isOpen()) return;
-    logger.info("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
-    ctx.close();
+  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
index fd1938d..08ea150 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
@@ -27,19 +27,14 @@ public abstract class RpcMessage {
   public RpcMode mode;
   public int rpcType;
   public int coordinationId;
-  public ByteBuf dBody;
   
-  public RpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf dBody) {
+  public RpcMessage(RpcMode mode, int rpcType, int coordinationId) {
     this.mode = mode;
     this.rpcType = rpcType;
     this.coordinationId = coordinationId;
-    this.dBody = dBody;
   }
   
   public abstract int getBodySize();
-
-  void release(){
-    if(dBody != null) dBody.release();
-  }
+  abstract void release();
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
new file mode 100644
index 0000000..fac908c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public abstract class RpcOutcomeListener<V> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcomeListener.class);
+  
+  public void failed(RpcException ex){};
+  public void success(V value){};
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 462bc52..20a7d7d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.MessageBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.CorruptedFrameException;
@@ -30,12 +31,14 @@ import com.google.protobuf.CodedInputStream;
 public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZeroCopyProtobufLengthDecoder.class);
 
+  
   @Override
-  protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+  protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
+
     if(!ctx.channel().isOpen()){
       logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
       in.skipBytes(in.readableBytes());
-      return null;
+      return;
     }
     
     in.markReaderIndex();
@@ -43,7 +46,7 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
     for (int i = 0; i < buf.length; i ++) {
         if (!in.isReadable()) {
             in.resetReaderIndex();
-            return null;
+            return;
         }
 
         buf[i] = in.readByte();
@@ -60,13 +63,14 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
 
             if (in.readableBytes() < length) {
                 in.resetReaderIndex();
-                return null;
+                return;
             } else {
-                ByteBuf out = in.slice(in.readerIndex(), length);
+                ByteBuf outBuf = in.slice(in.readerIndex(), length);
                 in.retain();
                 in.skipBytes(length);
                 if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.", in.readerIndex(), i+1, length));
-                return out;
+                out.add(outBuf);
+                return;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java
new file mode 100644
index 0000000..ecaf8d3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+public interface AvailabilityListener {
+  public void isAvailable(BitConnection connection);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index b16c6cb..4ba99a1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -18,45 +18,88 @@
 package org.apache.drill.exec.rpc.bit;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.rpc.BasicClient;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
 
 import com.google.protobuf.MessageLite;
 
-public class BitClient  extends BasicClient<RpcType>{
-  
-  private final DrillbitContext context;
+public class BitClient  extends BasicClient<RpcType, BitConnection>{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
+
   private final BitComHandler handler;
+  private final DrillbitEndpoint endpoint;
+  private BitConnection connection;
+  private final AvailabilityListener openListener;
+  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+  private final ListenerPool listeners;
   
-  public BitClient(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
-    super(alloc, eventLoopGroup);
-    this.context = context;
+  public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+
+    this.endpoint = endpoint;
     this.handler = handler;
+    this.openListener = openListener;
+    this.registry = registry;
+    this.listeners = listeners;
   }
   
+  public BitHandshake connect() throws RpcException, InterruptedException{
+    BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
+    connection.setEndpoint(endpoint);
+    return bs;
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
-  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
-    return handler.getResponseDefaultInstance(rpcType);
+  public BitConnection initRemoteConnection(Channel channel) {
+    this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+    return connection;
   }
 
   @Override
-  protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return handler.handle(context, rpcType, pBody, dBody);
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
+    return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
-    return super.getCloseHandler(ch);
+  public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    return BitComDefaultInstanceHandler.getResponseDefaultInstance(rpcType);
   }
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
+  @Override
+  protected Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    return handler.handle(connection, rpcType, pBody, dBody);
+  }
+
+  @Override
+  protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
+    return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
+
+      @Override
+      protected void validateHandshake(BitHandshake inbound) throws Exception {
+        logger.debug("Handling handshake from bit server to bit client. {}", inbound);
+        if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+      }
+
+    };
+  }
+  
+  public BitConnection getConnection(){
+    return this.connection;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index 2349899..c60d36b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -17,73 +17,34 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.GenericFutureListener;
-
 import java.io.Closeable;
-import java.util.Collection;
 
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 
 /**
- * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a server
- * or a client depending on who initially made the connection. If no connection exists, BitCom is
- * responsible for making a connection.  BitCom should automatically straight route local BitCommunication rather than connecting to its self.
+ * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a
+ * server or a client depending on who initially made the connection. If no connection exists, BitCom is responsible for
+ * making a connection. BitCom should automatically straight route local BitCommunication rather than connecting to its
+ * self.
  */
-public interface BitCom extends Closeable{
+public interface BitCom extends Closeable {
 
   /**
-   * Routes the output of a RecordBatch to another node.  The record batch
-   * @param node The node id to send the record batch to.
-   * @param batch The record batch to send.
-   * @return A SendProgress object which can be used to monitor the sending of the batch.
-   */
-  public abstract DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch);
-
-  
-  /**
-   * Requests an iterator to access an incoming record batch.  
-   * @param fragmentId
-   * @return
-   */
-  public RecordBatch getReceivingRecordBatchHandle(int majorFragmentId, int minorFragmentId);
-  
-  /**
-   * Send a query PlanFragment to another bit.   
-   * @param context
+   * Get a Bit to Bit communication tunnel. If the BitCom doesn't have a tunnel attached to the node already, it will
+   * start creating one. This create the connection asynchronously.
+   * 
    * @param node
-   * @param fragment
    * @return
    */
-  public abstract DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node, PlanFragment fragment);
+  public BitTunnel getTunnel(DrillbitEndpoint node) ;
 
-  public abstract void startQuery(Collection<DrillbitEndpoint> firstNodes, long queryId);
-    
-  
-  public abstract DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle);
-  
-  public abstract DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle);
-  
-  
-  public interface TunnelListener extends GenericFutureListener<ChannelFuture> {
-    public void connectionEstablished(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus);
-  }
-  
-  public interface SendManager{
-    /**
-     * Sender responsible for regularly checking this value to see whether it should continue to send or yield the process
-     * @return
-     */
-    public boolean canContinue();
-  }
+  public int start() throws InterruptedException, DrillbitStartupException;
 
+  /**
+   * Register an incoming batch handler for a local foreman.  
+   * @param handler
+   */
+  public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java
new file mode 100644
index 0000000..e1d4902
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.protobuf.MessageLite;
+
+public class BitComDefaultInstanceHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComDefaultInstanceHandler.class);
+  
+
+  public static MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    switch (rpcType) {
+    case RpcType.ACK_VALUE:
+      return Ack.getDefaultInstance();
+    case RpcType.HANDSHAKE_VALUE:
+      return BitHandshake.getDefaultInstance();
+    case RpcType.RESP_FRAGMENT_HANDLE_VALUE:
+      return FragmentHandle.getDefaultInstance();
+    case RpcType.RESP_FRAGMENT_STATUS_VALUE:
+      return FragmentStatus.getDefaultInstance();
+    case RpcType.RESP_BIT_STATUS_VALUE:
+      return BitStatus.getDefaultInstance();
+      
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
deleted file mode 100644
index b2c5cbb..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.rpc.bit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.socket.SocketChannel;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitBatchChunk;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.bit.BitCom.TunnelListener;
-import org.apache.drill.exec.rpc.bit.BitComImpl.TunnelModifier;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import com.google.protobuf.MessageLite;
-
-public class BitComHandler {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComHandler.class);
-  
-  private final TunnelModifier modifier;
-  
-  public BitComHandler(TunnelModifier modifier){
-    this.modifier = modifier;
-  }
-  
-  public TunnelListener getTunnelListener(RpcBus<?>.ChannelClosedHandler internalHandler){
-    return new Listener(internalHandler);
-  }
-  
-  public class Listener implements TunnelListener {
-    final RpcBus<?>.ChannelClosedHandler internalHandler;
-
-    public Listener(RpcBus<?>.ChannelClosedHandler internalHandler) {
-      this.internalHandler = internalHandler;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      logger.debug("BitTunnel closed, removing from BitCom.");
-      internalHandler.operationComplete(future);
-      BitTunnel t = modifier.remove(future.channel());
-      if(t != null) t.shutdownIfClient();
-    }
-
-    @Override
-    public void connectionEstablished(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
-      modifier.create(channel, endpoint, bus);
-    }
-
-  }
-
-  
-
-
-  public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
-    switch (rpcType) {
-    case RpcType.ACK_VALUE:
-      return Ack.getDefaultInstance();
-    case RpcType.HANDSHAKE_VALUE:
-      return BitHandshake.getDefaultInstance();
-    case RpcType.RESP_FRAGMENT_HANDLE_VALUE:
-      return FragmentHandle.getDefaultInstance();
-    case RpcType.RESP_FRAGMENT_STATUS_VALUE:
-      return FragmentStatus.getDefaultInstance();
-    case RpcType.RESP_BIT_STATUS_VALUE:
-      return BitStatus.getDefaultInstance();
-    case RpcType.RESP_BATCH_CHUNK_VALUE:
-      return BitBatchChunk.getDefaultInstance();
-      
-    default:
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  protected Response handle(DrillbitContext context, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    switch (rpcType) {
-    
-    case RpcType.HANDSHAKE_VALUE:
-      // parse incoming handshake.
-      // get endpoint information.
-      // record endpoint information in registry.
-      // respond with our handshake info.
-      return new Response(RpcType.HANDSHAKE, BitHandshake.getDefaultInstance(), null);
-      
-    case RpcType.REQ_BATCH_CHUNK_VALUE:
-      return new Response(RpcType.RESP_BATCH_CHUNK, BitBatchChunk.getDefaultInstance(), null);
-      
-    case RpcType.REQ_BIT_STATUS_VALUE:
-      return new Response(RpcType.RESP_BIT_STATUS, BitStatus.getDefaultInstance(), null);
-      
-    case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
-      return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
-
-    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
-      return new Response(RpcType.RESP_FRAGMENT_STATUS, FragmentStatus.getDefaultInstance(), null);
-      
-    case RpcType.REQ_INIATILIZE_FRAGMENT_VALUE:
-      return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
-      
-    case RpcType.REQ_RECORD_BATCH_VALUE:
-      return new Response(RpcType.RESP_BATCH_CHUNK, BitBatchChunk.getDefaultInstance(), null);
-      
-    default:
-      throw new UnsupportedOperationException();
-    }
-
-  }
-  
-
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index aada154..c98be44 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -17,138 +17,158 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
-import io.netty.channel.Channel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Future;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
-
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * Manages communication tunnels between nodes.   
+ */
 public class BitComImpl implements BitCom {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
 
-  private Map<DrillbitEndpoint, BitTunnel> tunnels = Maps.newConcurrentMap();
-  private Map<SocketChannel, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
-  private Object lock = new Object();
-  private BitServer server;
-  private DrillbitContext context;
+  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
+  private final ListenerPool listeners;
+  private volatile BitServer server;
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  
+  // TODO: this executor should be removed.
+  private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
 
-  public BitComImpl(DrillbitContext context) {
+  public BitComImpl(BootStrapContext context, BitComHandler handler) {
+    super();
+    this.handler = handler;
     this.context = context;
+    this.listeners = new ListenerPool(8);
   }
 
   public int start() throws InterruptedException, DrillbitStartupException {
-    server = new BitServer(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), context);
+    server = new BitServer(handler, context, registry, listeners);
     int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
     return server.bind(port);
   }
 
-  private Future<BitTunnel> getNode(DrillbitEndpoint endpoint) {
-    return null;
+  private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
+    
+    
+    SettableFuture<BitConnection> future = SettableFuture.create();
+    BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
+    BitConnection t = null;
+
+    if (check) {
+      t = registry.get(endpoint);
+
+      if (t != null) {
+        future.set(t);
+        return checkedFuture;
+      }
+    }
     
-//    BitTunnel t = tunnels.get(endpoint);
-//    if (t == null) {
-//      synchronized (lock) {
-//        t = tunnels.get(endpoint);
-//        if (t != null) return t;
-//        BitClient c = new BitClient(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(),
-//            context.getBitLoopGroup(), context);
-//
-//        // need to figure what to do here with regards to waiting for handshake before returning. Probably need to add
-//        // future registry so that new endpoint registration ping the registry.
-//        throw new UnsupportedOperationException();
-//        c.connectAsClient(endpoint.getAddress(), endpoint.getBitPort()).await();
-//        t = new BitTunnel(c);
-//        tunnels.put(endpoint, t);
-//
-//      }
-//    }
-//    return null;
+    try {
+      AvailWatcher watcher = new AvailWatcher(future);
+      BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
+      c.connect();
+      return checkedFuture;
+    } catch (InterruptedException | RpcException e) {
+      future.setException(new FragmentSetupException("Unable to open connection"));
+      return checkedFuture;
+    }
+
   }
 
-  
+  private class AvailWatcher implements AvailabilityListener{
+    final SettableFuture<BitConnection> future;
+    
+    public AvailWatcher(SettableFuture<BitConnection> future) {
+      super();
+      this.future = future;
+    }
 
-  @Override
-  public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node,
-      PlanFragment fragment) {
-    return null;
+    @Override
+    public void isAvailable(BitConnection connection) {
+      future.set(connection);
+    }
+    
+  }
+  
+  BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
+    BitConnection t = registry.get(endpoint);
+    if(t != null) return t;
+    return this.getNode(endpoint, false).checkedGet();
   }
 
-  @Override
-  public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle) {
-    return null;
+  
+  CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
+    return this.getNode(endpoint, true);
   }
 
+  
   @Override
-  public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node,
-      FragmentHandle handle) {
-    return null;
+  public BitTunnel getTunnel(DrillbitEndpoint endpoint){
+    BitConnection t = registry.get(endpoint);
+    if(t == null){
+      return new BitTunnel(exec, endpoint, this, t);
+    }else{
+      return new BitTunnel(exec, endpoint, this,  this.getNode(endpoint, false));
+    }
   }
 
-  private final TunnelModifier modifier = new TunnelModifier();
 
   /**
-   * Fully synchronized modifier. Contention should be low since endpoints shouldn't be constantly changing.
+   * A future which remaps exceptions to a BitComException.
+   * @param <T>
    */
-  class TunnelModifier {
-    public BitTunnel remove(Channel ch) {
-      synchronized (this) {
-        DrillbitEndpoint endpoint = endpoints.remove(ch);
-        if (endpoint == null) {
-          logger
-              .warn("We attempted to find a endpoint from a provided channel and found none.  This suggests a race condition or memory leak problem.");
-          return null;
-        }
-
-        BitTunnel tunnel = tunnels.remove(endpoint);
-        return tunnel;
-      }
+  private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
+
+    protected BitComFuture(ListenableFuture<T> delegate) {
+      super(delegate);
     }
 
-    public void create(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
-      synchronized (this) {
-        endpoints.put(channel, endpoint);
-        tunnels.put(endpoint, new BitTunnel(bus));
+    @Override
+    protected RpcException mapException(Exception e) {
+      Throwable t = e;
+      if(e instanceof ExecutionException){
+        t = e.getCause();
       }
+      
+      if(t instanceof RpcException) return (RpcException) t;
+      return new RpcException(t);
     }
   }
 
   public void close() {
     Closeables.closeQuietly(server);
-    for (BitTunnel bt : tunnels.values()) {
+    for (BitConnection bt : registry.values()) {
       bt.shutdownIfClient();
     }
   }
 
-
   @Override
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch) {
-    return null;
-  }
-
-  @Override
-  public RecordBatch getReceivingRecordBatchHandle(int majorFragmentId, int minorFragmentId) {
-    return null;
-  }
-
-  @Override
-  public void startQuery(Collection<DrillbitEndpoint> firstNodes, long queryId) {
+  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+    this.handler.registerIncomingFragmentHandler(handler);
   }
+  
+  
 
 }


Mime
View raw message