drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [04/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 01:52:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
new file mode 100644
index 0000000..73980f9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -0,0 +1,168 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.RpcBus;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+public class BitConnection extends RemoteConnection{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class); 
+  
+  private final RpcBus<RpcType, BitConnection> bus;
+  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+  private final ListenerPool listeners;
+
+  private final AvailabilityListener listener;
+  private volatile DrillbitEndpoint endpoint;
+  private volatile boolean active = false;
+  private final UUID id;
+  
+  public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+    super(channel);
+    this.bus = bus;
+    this.registry = registry;
+    // we use a local listener pool unless a global one is provided.
+    this.listeners = listeners != null ? listeners : new ListenerPool(2);
+    this.listener = listener;
+    this.id = UUID.randomUUID();
+  }
+
+  protected DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+  public ListenerPool getListenerPool(){
+    return listeners;
+  }
+  
+  protected void setEndpoint(DrillbitEndpoint endpoint) {
+    Preconditions.checkNotNull(endpoint);
+    Preconditions.checkArgument(this.endpoint == null);
+    
+    this.endpoint = endpoint;
+    BitServer.logger.debug("Adding new endpoint to available BitServer connections.  Endpoint: {}.", endpoint);
+    synchronized(this){
+      BitConnection c = registry.putIfAbsent(endpoint, this);
+      
+      if(c != null){ // the registry already has a connection like this
+        
+        // give the awaiting future an alternative connection.
+        if(listener != null){
+          listener.isAvailable(c);
+        }
+        
+        // shut this down if this is a client as it won't be available in the registry.
+        // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other.  This shouldn't cause a problem.
+        logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
+        shutdownIfClient();
+        
+      }
+      active = true;
+      if(listener != null) listener.isAvailable(this);
+    }
+  }
+
+  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
+    return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
+  }
+  
+  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
+    return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
+  }
+  
+  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+    return bus.send(this,  RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+  }
+  
+  public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
+    return bus.send(this,  RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
+  }
+
+  public void disable(){
+    active = false;
+  }
+  
+  public boolean isActive(){
+    return active;
+  }
+  
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((id == null) ? 0 : id.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (obj == null) return false;
+    if (getClass() != obj.getClass()) return false;
+    BitConnection other = (BitConnection) obj;
+    if (id == null) {
+      if (other.id != null) return false;
+    } else if (!id.equals(other.id)) return false;
+    return true;
+  }
+
+  public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
+    return new CloseHandler(this, parent);
+  }
+  
+  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+    private BitConnection connection;
+    private GenericFutureListener<ChannelFuture> parent;
+    
+    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+      super();
+      this.connection = connection;
+      this.parent = parent;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
+      parent.operationComplete(future);
+    }
+    
+  }
+  
+  public void shutdownIfClient(){
+    if(bus.isClient()) Closeables.closeQuietly(bus);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
new file mode 100644
index 0000000..0160d24
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class BitConnectionManager {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
+  
+  private final int maxAttempts;
+  private final BitComImpl com;
+  private final DrillbitEndpoint endpoint;
+  private final AtomicReference<BitConnection> connection;
+  private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+
+  BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
+    assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
+    this.com = com;
+    this.connection =  new AtomicReference<BitConnection>(connection);
+    this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
+    this.endpoint = endpoint;
+    this.maxAttempts = maxAttempts;
+  }
+  
+  BitConnection getConnection(int attempt) throws RpcException{
+    BitConnection con = connection.get();
+    
+    if(con != null){
+      if(con.isActive()) return con;
+      connection.compareAndSet(con, null);
+    }
+    
+    CheckedFuture<BitConnection, RpcException> fut = future.get();
+
+    if(fut != null){
+      try{
+        return fut.checkedGet();
+      }catch(RpcException ex){
+        future.compareAndSet(fut, null);
+        if(attempt < maxAttempts){
+          return getConnection(attempt + 1);
+        }else{
+          throw ex;
+        }
+      }
+    }
+    
+    // no checked future, let's make one.
+    fut = com.getConnectionAsync(endpoint);
+    future.compareAndSet(null, fut);
+    return getConnection(attempt);
+    
+  }
+
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java
new file mode 100644
index 0000000..32fd4f9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitRpcConfig.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConfig;
+
+public class BitRpcConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitRpcConfig.class);
+  
+  public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-RPC-MAPPING") //
+      .add(RpcType.HANDSHAKE, BitHandshake.class, RpcType.HANDSHAKE, BitHandshake.class)
+      .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class)
+      .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+      .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
+      .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+      .build();
+  
+  public static int RPC_VERSION = 2;
+  
+  public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index e17b25c..88ac6cc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -18,47 +18,76 @@
 package org.apache.drill.exec.rpc.bit;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
 
 import com.google.protobuf.MessageLite;
 
-public class BitServer extends BasicServer<RpcType>{
+public class BitServer extends BasicServer<RpcType, BitConnection>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
   
-  private final DrillbitContext context;
   private final BitComHandler handler;
+  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+  private final ListenerPool listeners;
   
-  public BitServer(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
-    super(alloc, eventLoopGroup);
-    this.context = context;
+  public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
     this.handler = handler;
+    this.registry = registry;
+    this.listeners = listeners;
   }
   
   @Override
-  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
-    return handler.getResponseDefaultInstance(rpcType);
+  public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    return BitComDefaultInstanceHandler.getResponseDefaultInstance(rpcType);
   }
 
   @Override
-  protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    return handler.handle(context, rpcType, pBody, dBody);
+  protected Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    return handler.handle(connection, rpcType, pBody, dBody);
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
-    
-    return super.getCloseHandler(ch);
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
+    return connection.getCloseHandler(super.getCloseHandler(connection));
+  }
+
+  @Override
+  public BitConnection initRemoteConnection(Channel channel) {
+    return new BitConnection(null, channel, this, registry, listeners);
+  }
+  
+  
+  @Override
+  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+    return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
+      
+      @Override
+      public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
+        logger.debug("Handling handshake from other bit. {}", inbound);
+        if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+        return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
+      }
+
+    };
   }
 
+
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 02991ad..652fa52 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,47 +17,222 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * Interface provided for communication between two bits.  Provided by both a server and a client implementation.
+ * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
+ * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
+ * and action. A better approach should be done.
  */
 public class BitTunnel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
 
-  final RpcBus<?> bus;
+  private static final int MAX_ATTEMPTS = 3;
 
-  public BitTunnel(RpcBus<?> bus){
-    this.bus = bus;
-  }
+  private final BitConnectionManager manager;
+  private final Executor exec;
   
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, RecordBatch batch){
-    return null;
+
+  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
+    this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
+    this.exec = exec;
   }
-  
-  public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, PlanFragment fragment){
-    return null;
+
+  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
+      CheckedFuture<BitConnection, RpcException> future) {
+    this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
+    this.exec = exec;
   }
   
-  public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, FragmentHandle handle){
-    return null;
+  public DrillbitEndpoint getEndpoint(){
+    return manager.getEndpoint();
   }
-  
-  public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, FragmentHandle handle){
-    return null;
+
+  private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
+    exec.execute(command);
+    return command;
+  }
+
+  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
+    return submit(new SendBatch(batch, context));
+  }
+
+  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
+    return submit(new SendFragment(fragment));
+  }
+
+  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
+    return submit(new CancelFragment(handle));
   }
+
+  public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
+    return submit(new SendFragmentStatus(status));
+  }
+
+  public class SendBatch extends BitCommand<Ack> {
+    final FragmentWritableBatch batch;
+    final FragmentContext context;
+
+    public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
+      super();
+      this.batch = batch;
+      this.context = context;
+    }
+
+    @Override
+    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+      logger.debug("Sending record batch. {}", batch);
+      return connection.sendRecordBatch(context, batch);
+    }
+
+  }
+
+  public class SendFragmentStatus extends BitCommand<Ack> {
+    final FragmentStatus status;
+
+    public SendFragmentStatus(FragmentStatus status) {
+      super();
+      this.status = status;
+    }
+
+    @Override
+    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+      return connection.sendFragmentStatus(status);
+    }
+  }
+
+  public class CancelFragment extends BitCommand<Ack> {
+    final FragmentHandle handle;
+
+    public CancelFragment(FragmentHandle handle) {
+      super();
+      this.handle = handle;
+    }
+
+    @Override
+    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+      return connection.cancelFragment(handle);
+    }
+
+  }
+
+  public class SendFragment extends BitCommand<Ack> {
+    final PlanFragment fragment;
+
+    public SendFragment(PlanFragment fragment) {
+      super();
+      this.fragment = fragment;
+    }
+
+    @Override
+    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
+      return connection.sendFragment(fragment);
+    }
+
+  }
+
+
   
-  public void shutdownIfClient(){
-    if(bus.isClient()) Closeables.closeQuietly(bus);
+
+  private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
+
+    public void addLightListener(RpcOutcomeListener<T> outcomeListener){
+      this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+    }
+
+    public BitCommand() {
+      super(SettableFuture.<T> create());
+    }
+
+    public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
+
+    public final void run() {
+      
+      try {
+        
+        BitConnection connection = manager.getConnection(0);
+        assert connection != null : "The connection manager should never return a null connection.  Worse case, it should throw an exception.";
+        CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
+        rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
+      } catch (RpcException ex) {
+        ((SettableFuture<T>) delegate()).setException(ex);
+      }
+
+    }
+
+    @Override
+    protected RpcException mapException(Exception e) {
+      Throwable t = e;
+      if (e instanceof ExecutionException) {
+        t = e.getCause();
+      }
+      if (t instanceof RpcException) return (RpcException) t;
+      return new RpcException(t);
+    }
+
+    public class RpcOutcomeListenerWrapper implements Runnable{
+      final RpcOutcomeListener<T> inner;
+      
+      public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
+        this.inner = inner;
+      }
+
+      @Override
+      public void run() {
+        try{
+          inner.success(BitCommand.this.checkedGet());
+        }catch(RpcException e){
+          inner.failed(e);
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "BitCommand ["+this.getClass().getSimpleName()+"]";
+    }
+    
+    
+    
+  }
+
+  private class FutureBridge<T> implements Runnable {
+    final SettableFuture<T> out;
+    final CheckedFuture<T, RpcException> in;
+
+    public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
+      super();
+      this.out = out;
+      this.in = in;
+    }
+
+    @Override
+    public void run() {
+      try {
+        out.set(in.checkedGet());
+      } catch (RpcException ex) {
+        out.setException(ex);
+      }
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
new file mode 100644
index 0000000..8f299d2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.work.foreman.FragmentStatusListener;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
+
+public class ListenerPool {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
+  
+  private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+  
+  public ListenerPool(int par){
+    listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+  }
+  
+  public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+    listeners.remove(handle);
+  }
+  
+  public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
+    FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+    if(old != null) throw new RpcException("Failure.  The provided handle already exists in the listener pool.  You need to remove one listener before adding another.");
+  }
+  
+  public void status(FragmentStatus status){
+    FragmentStatusListener l = listeners.get(status.getHandle());
+    if(l == null){
+      logger.info("A fragment message arrived but there was no registered listener for that message.");
+      return;
+    }else{
+      l.statusUpdate(status);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
new file mode 100644
index 0000000..3df88b7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+
+public class QueryResultBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
+  
+  private final QueryResult header;
+  private final ByteBuf data;
+  
+  public QueryResultBatch(QueryResult header, ByteBuf data) {
+//    logger.debug("New Result Batch with header {} and data {}", header, data);
+    this.header = header;
+    this.data = data;
+  }
+
+  public QueryResult getHeader() {
+    return header;
+  }
+
+  public ByteBuf getData() {
+    return data;
+  }
+  
+  
+  public boolean hasData(){
+    return data != null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 0088522..5d2e799 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,57 +20,194 @@ package org.apache.drill.exec.rpc.user;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.BasicClient;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
 import com.google.protobuf.MessageLite;
 
-public class UserClient extends BasicClient<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
-  
+
+  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
   public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(alloc, eventLoopGroup);
+    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
   }
 
+  public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
+    this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
+    return resultsListener.getFuture();
+  }
 
-  public DrillRpcFuture<QueryHandle> submitQuery(RunQuery query) throws RpcException {
-    return this.send(RpcType.RUN_QUERY, query, QueryHandle.class, null);
+  public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
+    return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
   }
   
-  public DrillRpcFuture<QueryHandle> submitQuery(RunQuery query, ByteBuf data) throws RpcException {
-    return this.send(RpcType.RUN_QUERY, query, QueryHandle.class, data);
+  private class BufferingListener extends UserResultsListener {
+
+    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private volatile UserResultsListener output;
+
+    public boolean transferTo(UserResultsListener l) {
+      lock.writeLock().lock();
+      output = l;
+      boolean last = false;
+      for (QueryResultBatch r : results) {
+        l.resultArrived(r);
+        last = r.getHeader().getIsLastChunk();
+      }
+      if (future.isDone()) {
+        l.set();
+      }
+      return last;
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      logger.debug("Result arrvied.");
+      lock.readLock().lock();
+      try {
+        if (output == null) {
+          this.results.add(result);
+        } else {
+          output.resultArrived(result);
+        }
+
+      } finally {
+        lock.readLock().unlock();
+      }
+
+    }
+
+    @Override
+    public void submissionFailed(RpcException ex) {
+      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+    }
+
+  }
+
+  private class SubmissionListener extends RpcOutcomeListener<QueryId> {
+    private UserResultsListener listener;
+
+    public SubmissionListener(UserResultsListener listener) {
+      super();
+      this.listener = listener;
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.submissionFailed(ex);
+    }
+
+    @Override
+    public void success(QueryId queryId) {
+      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
+      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+      // we need to deal with the situation where we already received results by the time we got the query id back. In
+      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+      // results during the transition
+      if (oldListener != null) {
+        logger.debug("Unable to place user results listener, buffering listener was already in place.");
+        if (oldListener instanceof BufferingListener) {
+          resultsListener.remove(oldListener);
+          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+          // simply remove the buffering listener if we already have the last response.
+          if (all) {
+            resultsListener.remove(oldListener);
+          } else {
+            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+            if (!replaced) throw new IllegalStateException();
+          }
+        } else {
+          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+        }
+      }
+
+    }
+
   }
 
   @Override
   protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
-    switch(rpcType){
+    switch (rpcType) {
     case RpcType.ACK_VALUE:
       return Ack.getDefaultInstance();
     case RpcType.HANDSHAKE_VALUE:
       return BitToUserHandshake.getDefaultInstance();
     case RpcType.QUERY_HANDLE_VALUE:
-      return QueryHandle.getDefaultInstance();
+      return QueryId.getDefaultInstance();
     case RpcType.QUERY_RESULT_VALUE:
       return QueryResult.getDefaultInstance();
     }
     throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
   }
 
+  protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    switch (rpcType) {
+    case RpcType.QUERY_RESULT_VALUE:
+      final QueryResult result = get(pBody, QueryResult.PARSER);
+      final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+      UserResultsListener l = resultsListener.get(result.getQueryId());
+//      logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+      if (l != null) {
+//        logger.debug("Results listener available, using existing.");
+        l.resultArrived(batch);
+        if (result.getIsLastChunk()) {
+          resultsListener.remove(result.getQueryId(), l);
+          l.set();
+        }
+      } else {
+        logger.debug("Results listener not available, creating a buffering listener.");
+        // manage race condition where we start getting results before we receive the queryid back.
+        BufferingListener bl = new BufferingListener();
+        l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+        if (l != null) {
+          l.resultArrived(batch);
+        } else {
+          bl.resultArrived(batch);
+        }
+      }
+
+      return new Response(RpcType.ACK, Ack.getDefaultInstance());
+    default:
+      throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
+    }
+
+  }
 
   @Override
-  protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
-    logger.debug("Received a server > client message of type " + rpcType);
-    return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
+  protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
+    return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+
+      @Override
+      protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
+        logger.debug("Handling handshake from bit to user. {}", inbound);
+        if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+          throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.",
+              inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
+      }
+
+    };
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
new file mode 100644
index 0000000..3ce14f0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import java.util.concurrent.Future;
+
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+public abstract class UserResultsListener {
+  SettableFuture<Void> future = SettableFuture.create();
+  
+  final void set(){
+    future.set(null);
+  }
+  
+  Future<Void> getFuture(){
+    return future;
+  }
+
+  public abstract void submissionFailed(RpcException ex); 
+  public abstract void resultArrived(QueryResultBatch result);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
new file mode 100644
index 0000000..893e432
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.RpcConfig;
+
+public class UserRpcConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
+  
+  public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
+      .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
+      .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
+      .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
+      .build();
+  
+  public static int RPC_VERSION = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index cccaa55..406afc4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,31 +19,36 @@ package org.apache.drill.exec.rpc.user;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.user.UserWorker;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 
-public class UserServer extends BasicServer<RpcType> {
+public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
-  
-  final DrillbitContext context;
-  
-  public UserServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
-    super(alloc, eventLoopGroup);
-    this.context = context;
+
+  final UserWorker worker;
+
+  public UserServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, UserWorker worker) {
+    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
+    this.worker = worker;
   }
 
   @Override
@@ -55,36 +60,70 @@ public class UserServer extends BasicServer<RpcType> {
     default:
       throw new UnsupportedOperationException();
     }
-
   }
 
-  public DrillRpcFuture<QueryResult> sendResult(RunQuery query, ByteBuf data) throws RpcException {
-    return this.send(RpcType.QUERY_RESULT, query, QueryResult.class, data);
-  }
-  
-  
   @Override
-  protected Response handle(SocketChannel channel, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+  protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
+      throws RpcException {
     switch (rpcType) {
 
     case RpcType.HANDSHAKE_VALUE:
-//      logger.debug("Received handshake, responding in kind.");
-      return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance(), null);
-      
+      // logger.debug("Received handshake, responding in kind.");
+      return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance());
+
     case RpcType.RUN_QUERY_VALUE:
-//      logger.debug("Received query to run.  Returning query handle.");
-      return new Response(RpcType.QUERY_HANDLE, QueryHandle.newBuilder().setQueryId(1).build(), null);
-      
+      // logger.debug("Received query to run.  Returning query handle.");
+      try {
+        RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
+      } catch (InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding RunQuery body.", e);
+      }
+
     case RpcType.REQUEST_RESULTS_VALUE:
-//      logger.debug("Received results requests.  Returning empty query result.");
-      return new Response(RpcType.QUERY_RESULT, QueryResult.getDefaultInstance(), null);
-      
+      // logger.debug("Received results requests.  Returning empty query result.");
+      try {
+        RequestResults req = RequestResults.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, req));
+      } catch (InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding RequestResults body.", e);
+      }
+
     default:
       throw new UnsupportedOperationException();
     }
 
   }
+
+  public class UserClientConnection extends RemoteConnection {
+    public UserClientConnection(Channel channel) {
+      super(channel);
+    }
+
+    public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
+      return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+    }
+
+  }
+
+  @Override
+  public UserClientConnection initRemoteConnection(Channel channel) {
+    return new UserClientConnection(channel);
+  }
   
-  
+  @Override
+  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+    return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
+
+      @Override
+      public MessageLite getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
+        logger.debug("Handling handshake from user to bit. {}", inbound);
+        if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
+        return BitToUserHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build();
+      }
 
+    };
+  }
+
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
new file mode 100644
index 0000000..3c4d9af
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+
+import com.yammer.metrics.MetricRegistry;
+
+public class BootStrapContext implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
+  
+  private final DrillConfig config;
+  private final NioEventLoopGroup loop;
+  private final MetricRegistry metrics;
+  private final BufferAllocator allocator;
+  
+  public BootStrapContext(DrillConfig config) {
+    super();
+    this.config = config;
+    this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+    this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
+    this.allocator = BufferAllocator.getAllocator(config);
+  }
+
+  public DrillConfig getConfig() {
+    return config;
+  }
+
+  public NioEventLoopGroup getBitLoopGroup() {
+    return loop;
+  }
+
+  public MetricRegistry getMetrics() {
+    return metrics;
+  }
+
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public void close(){
+    loop.shutdown();
+    allocator.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c33afce..7d745e1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -17,10 +17,9 @@
  ******************************************************************************/
 package org.apache.drill.exec.server;
 
-import java.net.InetAddress;
+import java.io.Closeable;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.HazelCache;
@@ -28,15 +27,16 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.service.ServiceEngine;
+import org.apache.drill.exec.work.WorkManager;
 
 import com.google.common.io.Closeables;
 
 /**
  * Starts, tracks and stops all the required services for a Drillbit daemon to work.
  */
-public class Drillbit {
+public class Drillbit implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
 
   public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
@@ -47,7 +47,7 @@ public class Drillbit {
     Drillbit bit;
     try {
       logger.debug("Setting up Drillbit.");
-      bit = new Drillbit(config);
+      bit = new Drillbit(config, null);
     } catch (Exception ex) {
       throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
     }
@@ -65,35 +65,37 @@ public class Drillbit {
     start(options);
   }
 
-  private final DrillbitContext context;
-  final BufferAllocator pool;
   final ClusterCoordinator coord;
   final ServiceEngine engine;
   final DistributedCache cache;
-  final DrillConfig config;
-  private RegistrationHandle handle;
-
-  public Drillbit(DrillConfig config) throws Exception {
-    final DrillbitContext context = new DrillbitContext(config, this);
-    Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
-    this.context = context;
-    this.pool = BufferAllocator.getAllocator(context);
-    this.coord = new ZKClusterCoordinator(config);
-    this.engine = new ServiceEngine(context);
-    this.cache = new HazelCache(context.getConfig());
-    this.config = config;
+  final WorkManager manager;
+  final BootStrapContext context;
+  
+  private volatile RegistrationHandle handle;
+
+  public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+    if(serviceSet != null){
+      this.context = new BootStrapContext(config);
+      this.manager = new WorkManager(context);
+      this.coord = serviceSet.getCoordinator();
+      this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
+      this.cache = serviceSet.getCache();
+    }else{
+      Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
+      this.context = new BootStrapContext(config);
+      this.manager = new WorkManager(context);
+      this.coord = new ZKClusterCoordinator(config);
+      this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
+      this.cache = new HazelCache(config);
+    }
   }
 
   public void run() throws Exception {
-    coord.start();
-    engine.start();
-    DrillbitEndpoint md = DrillbitEndpoint.newBuilder()
-      .setAddress(InetAddress.getLocalHost().getHostAddress())
-      .setBitPort(engine.getBitPort())
-      .setUserPort(engine.getUserPort())
-      .build();
+    coord.start(10000);
+    DrillbitEndpoint md = engine.start();
+    cache.run();
+    manager.start(md, cache, engine.getBitCom(), coord);
     handle = coord.register(md);
-    cache.run(md);
   }
 
   public void close() {
@@ -107,7 +109,8 @@ public class Drillbit {
 
     Closeables.closeQuietly(engine);
     Closeables.closeQuietly(coord);
-    Closeables.closeQuietly(pool);
+    Closeables.closeQuietly(manager);
+    Closeables.closeQuietly(context);
     logger.info("Shutdown completed.");
   }
 
@@ -123,5 +126,11 @@ public class Drillbit {
     }
 
   }
+  public ClusterCoordinator getCoordinator(){
+    return coord;
+  }
 
+  public DrillbitContext getContext(){
+    return this.manager.getContext();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index b08b070..d5aaab2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -23,42 +23,60 @@ import java.util.Collection;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.store.StorageEngine;
 
+import com.google.common.base.Preconditions;
 import com.yammer.metrics.MetricRegistry;
 
 public class DrillbitContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+  private BootStrapContext context;
+
+  private PhysicalPlanReader reader;
+  private final ClusterCoordinator coord;
+  private final BitCom com;
+  private final DistributedCache cache;
+  private final DrillbitEndpoint endpoint;
   
-  private final DrillConfig config;
-  private final Drillbit underlyingBit;
-  private final NioEventLoopGroup loop;
-  private final MetricRegistry metrics;
-  
-  public DrillbitContext(DrillConfig config, Drillbit underlyingBit) {
+  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, BitCom com, DistributedCache cache) {
     super();
-    this.config = config;
-    this.underlyingBit = underlyingBit;
-    this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
-    this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
+    Preconditions.checkNotNull(endpoint);
+    Preconditions.checkNotNull(context);
+    Preconditions.checkNotNull(context);
+    Preconditions.checkNotNull(com);
+    Preconditions.checkNotNull(com);
+    
+    this.context = context;
+    this.coord = coord;
+    this.com = com;
+    this.cache = cache;
+    this.endpoint = endpoint;
+    this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint);
+  }
+  
+  public DrillbitEndpoint getEndpoint(){
+    return endpoint;
   }
   
   public DrillConfig getConfig() {
-    return config;
+    return context.getConfig();
   }
   
   public Collection<DrillbitEndpoint> getBits(){
-    return underlyingBit.coord.getAvailableEndpoints();
+    return coord.getAvailableEndpoints();
   }
 
   public BufferAllocator getAllocator(){
-    return underlyingBit.pool;
+    return context.getAllocator();
   }
   
   public StorageEngine getStorageEngine(StorageEngineConfig config){
@@ -66,19 +84,23 @@ public class DrillbitContext {
   }
   
   public NioEventLoopGroup getBitLoopGroup(){
-    return loop;
+    return context.getBitLoopGroup();
   }
   
   public BitCom getBitCom(){
-    return underlyingBit.engine.getBitCom();
+    return com;
   }
   
   public MetricRegistry getMetrics(){
-    return metrics;
+    return context.getMetrics();
   }
   
   public DistributedCache getCache(){
-    return underlyingBit.cache;
+    return cache;
+  }
+  
+  public PhysicalPlanReader getPlanReader(){
+    return reader;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
new file mode 100644
index 0000000..0337a68
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.LocalCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.LocalClusterCoordinator;
+
+public class RemoteServiceSet implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
+  
+  private final DistributedCache cache;
+  private final ClusterCoordinator coordinator;
+  
+  public RemoteServiceSet(DistributedCache cache, ClusterCoordinator coordinator) {
+    super();
+    this.cache = cache;
+    this.coordinator = coordinator;
+  }
+
+  public DistributedCache getCache() {
+    return cache;
+  }
+
+  public ClusterCoordinator getCoordinator() {
+    return coordinator;
+  }
+  
+  
+  @Override
+  public void close() throws IOException {
+    cache.close();
+    coordinator.close();
+  }
+
+  public static RemoteServiceSet getLocalServiceSet(){
+    return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 5d83bdb..d6d3b9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -17,49 +17,48 @@
  ******************************************************************************/
 package org.apache.drill.exec.service;
 
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.nio.NioEventLoopGroup;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.rpc.bit.BitComImpl;
 import org.apache.drill.exec.rpc.user.UserServer;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.user.UserWorker;
 
 import com.google.common.io.Closeables;
 
 public class ServiceEngine implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class);
   
-  UserServer userServer;
-  BitComImpl bitCom;
-  int userPort;
-  int bitPort;
-  DrillbitContext context;
+  private final UserServer userServer;
+  private final BitCom bitCom;
+  private final DrillConfig config;
   
-  public ServiceEngine(DrillbitContext context){
-    this.context = context;
-    ByteBufAllocator allocator = context.getAllocator().getUnderlyingAllocator();
-    this.userServer = new UserServer(allocator, new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), context);
-    this.bitCom = new BitComImpl(context);
+  public ServiceEngine(BitComHandler bitComWorker, UserWorker userWorker, BootStrapContext context){
+    this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), userWorker);
+    this.bitCom = new BitComImpl(context, bitComWorker);
+    this.config = context.getConfig();
   }
   
-  public void start() throws DrillbitStartupException, InterruptedException{
-    userPort = userServer.bind(context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT));
-    bitPort = bitCom.start();
-  }
-  
-  public int getBitPort(){
-    return bitPort;
-  }
-  
-  public int getUserPort(){
-    return userPort;
+  public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
+    int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
+    int bitPort = bitCom.start();
+    return DrillbitEndpoint.newBuilder()
+        .setAddress(InetAddress.getLocalHost().getHostAddress())
+        .setBitPort(bitPort)
+        .setUserPort(userPort)
+        .build();
   }
 
   public BitCom getBitCom(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
index d89b431..80704fa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
@@ -23,8 +23,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimap;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 9fc4165..67c84ed 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -17,9 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.store;
 
-import org.apache.drill.exec.exception.ExecutionSetupException;
-import org.apache.drill.exec.ops.OutputMutator;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
 
 public interface RecordReader {
 
@@ -35,7 +34,7 @@ public interface RecordReader {
    *          mutating the set of schema values for that particular record.
    * @throws ExecutionSetupException
    */
-  public abstract void setup(BatchSchema expectedSchema, OutputMutator output) throws ExecutionSetupException;
+  public abstract void setup(OutputMutator output) throws ExecutionSetupException;
 
   /**
    * Increment record reader forward, writing into the provided output batch.  

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
index 67ea5b6..4884b7a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
@@ -22,8 +22,8 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimap;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
new file mode 100644
index 0000000..d2e8b8f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+
+import com.google.protobuf.Internal.EnumLite;
+
+/**
+ * Simple wrapper class around AtomicInteger which allows management of a State value extending EnumLite.
+ * @param <T> The type of EnumLite to use for state.
+ */
+public abstract class AtomicState<T extends EnumLite> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicState.class);
+  
+  private final AtomicInteger state = new AtomicInteger();
+  
+  /**
+   * Constructor that defines initial T state.
+   * @param initial
+   */
+  public AtomicState(T initial){
+    state.set(initial.getNumber());
+  }
+  
+  protected abstract T getStateFromNumber(int i);
+  
+  /**
+   * Does an atomic conditional update from one state to another.  
+   * @param oldState The expected current state.
+   * @param newState The desired new state.
+   * @return Whether or not the update was successful.
+   */
+  public boolean updateState(T oldState, T newState){
+    return state.compareAndSet(oldState.getNumber(), newState.getNumber());
+  }
+  
+  public T getState(){
+    return getStateFromNumber(state.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
new file mode 100644
index 0000000..0e8edd5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
+
+public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentRunnerListener.class);
+  
+  private FragmentContext context;
+  private volatile long startNanos;
+  
+  public AbstractFragmentRunnerListener(FragmentContext context) {
+    super();
+    this.context = context;
+  }
+  
+  private  FragmentStatus.Builder getBuilder(FragmentState state){
+    FragmentStatus.Builder status = FragmentStatus.newBuilder();
+    context.addMetricsToStatus(status);
+    status.setState(state);
+    status.setRunningTime(System.nanoTime() - startNanos);
+    status.setHandle(context.getHandle());
+    status.setMemoryUse(context.getAllocator().getAllocatedMemory());
+    return status;
+  }
+  
+  @Override
+  public void stateChanged(FragmentHandle handle, FragmentState newState) {
+    FragmentStatus.Builder status = getBuilder(newState);
+
+    switch(newState){
+    case AWAITING_ALLOCATION:
+      awaitingAllocation(handle, status);
+      break;
+    case CANCELLED:
+      cancelled(handle, status);
+      break;
+    case FAILED:
+      // no op since fail should have also been called.
+      break;
+    case FINISHED:
+      finished(handle, status);
+      break;
+    case RUNNING:
+      this.startNanos = System.nanoTime();
+      running(handle, status);
+      break;
+    case SENDING:
+      // no op.
+      break;
+    default:
+      break;
+    
+    }
+  }
+  
+  protected void awaitingAllocation(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+    statusChange(handle, statusBuilder.build());
+  }
+  
+  protected void running(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+    statusChange(handle, statusBuilder.build());
+  }
+
+  protected void cancelled(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+    statusChange(handle, statusBuilder.build());
+  }
+
+  protected void finished(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+    statusChange(handle, statusBuilder.build());
+  }
+  
+  protected void statusChange(FragmentHandle handle, FragmentStatus status){
+    
+  }
+  
+  @Override
+  public final void fail(FragmentHandle handle, String message, Throwable excep) {
+    FragmentStatus.Builder status = getBuilder(FragmentState.FAILED);
+    status.setError(ErrorHelper.logAndConvertError(context.getIdentity(), message, excep, logger));
+    fail(handle, status);
+  }
+
+  protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+    statusChange(handle, statusBuilder.build());
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
new file mode 100644
index 0000000..3c7ef04
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public interface CancelableQuery {
+  public void cancel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
new file mode 100644
index 0000000..f6a9786
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
+
+  protected final DrillbitEndpoint endpoint;
+  protected final V value;
+  
+  public EndpointListener(DrillbitEndpoint endpoint, V value) {
+    super();
+    this.endpoint = endpoint;
+    this.value = value;
+  }
+
+  protected DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+  protected V getValue() {
+    return value;
+  }
+
+  
+}


Mime
View raw message