drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/3] Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.
Date Wed, 22 May 2013 01:39:05 GMT
Updated Branches:
  refs/heads/execwork e57a8d6d4 -> b8db98ad7


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index d3664a0..1170a1e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -40,7 +41,6 @@ import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.batch.BitComHandlerImpl;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 import org.apache.drill.exec.work.user.UserWorker;
 
 import com.google.common.collect.Maps;
@@ -63,7 +63,7 @@ public class WorkManager implements Closeable{
   private final BitComHandler bitComWorker;
   private final UserWorker userWorker;
   private final WorkerBee bee;
-  private Executor executor = Executors.newFixedThreadPool(4);
+  private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("Working
Thread - "));
   private final EventThread eventThread;
   
   public WorkManager(BootStrapContext context){
@@ -148,9 +148,10 @@ public class WorkManager implements Closeable{
   public void run() {
     try {
     while(true){
-      logger.debug("Checking for pending work tasks.");
+      logger.debug("Polling for pending work tasks.");
       Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
       if(r != null){
+        logger.debug("Starting pending task {}", r);
         executor.execute(r);  
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index 5dacb71..ec03392 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -67,18 +67,23 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
   
   public abstract void streamFinished(int minorFragmentId);
   
-  public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch
batch) {
+  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch
batch) {
+    boolean decremented = false;
     if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
       int rem = remainingRequired.decrementAndGet();
       if (rem == 0) {
         parentAccounter.decrementAndGet();
+        decremented = true;
       }
     }
     if(batch.getHeader().getIsLastBatch()){
       streamFinished(minorFragmentId);
     }
     getBuffer(minorFragmentId).enqueue(throttle, batch);
+    return decremented;
   }
 
+  
+
   protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
index ff091d7..b5a497e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -24,8 +24,7 @@ import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
 
 interface BatchCollector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
-
-  public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch
batch);
+  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch
batch);
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 9b227da..edda714 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -159,14 +159,11 @@ public class BitComHandlerImpl implements BitComHandler {
     // Create a handler if there isn't already one.
     if(handler == null){
       
-      
-      
       PlanFragment fragment = bee.getContext().getCache().getFragment(handle);
       if(fragment == null){
         logger.error("Received batch where fragment was not in cache.");
         return Acks.FAIL;
       }
-      
 
       IncomingFragmentHandler newHandler = new RemoteFragmentHandler(fragment, bee.getContext(),
bee.getContext().getBitCom().getTunnel(fragment.getForeman()));
       
@@ -174,7 +171,7 @@ public class BitComHandlerImpl implements BitComHandler {
       handler = handlers.putIfAbsent(fragment.getHandle(), newHandler);
           
       if(handler == null){
-        // we added a handler, inform foreman that we did so.  This way, the foreman can
track status.  We also tell foreman that we don't need inform ourself.
+        // we added a handler, inform the bee that we did so.  This way, the foreman can
track status. 
         bee.addFragmentPendingRemote(newHandler);
         handler = newHandler;
       }
@@ -182,10 +179,12 @@ public class BitComHandlerImpl implements BitComHandler {
     
     boolean canRun = handler.handle(connection.getConnectionThrottle(), new RawFragmentBatch(fragmentBatch,
body));
     if(canRun){
+      logger.debug("Arriving batch means local batch can run, starting local batch.");
       // if we've reached the canRun threshold, we'll proceed.  This expects handler.handle()
to only return a single true.
       bee.startFragmentPendingRemote(handler);
     }
-    if(handler.isDone()){
+    if(fragmentBatch.getIsLastBatch() && !handler.isWaiting()){
+      logger.debug("Removing handler.  Is Last Batch {}.  Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
handler.isWaiting());
       handlers.remove(handler.getHandle());
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 20775c5..264c4b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.work.batch;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -28,6 +29,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -42,7 +44,10 @@ public class IncomingBuffers {
 
   public IncomingBuffers(PhysicalOperator root) {
     Map<Integer, BatchCollector> counts = Maps.newHashMap();
-    root.accept(new CountRequiredFragments(), counts);
+    CountRequiredFragments reqFrags = new CountRequiredFragments();
+    root.accept(reqFrags, counts);
+    
+    logger.debug("Came up with a list of {} required fragments.  Fragments {}", remainingRequired.get(),
counts);
     streamsRemaining.set(remainingRequired.get());
     fragCounts = ImmutableMap.copyOf(counts);
   }
@@ -53,11 +58,13 @@ public class IncomingBuffers {
     if(batch.getHeader().getIsLastBatch()){
       streamsRemaining.decrementAndGet();
     }
+    int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
+    BatchCollector fSet = fragCounts.get(sendMajorFragmentId);
+    if (fSet == null) throw new FragmentSetupException(String.format("We received a major
fragment id that we were not expecting.  The id was %d.", sendMajorFragmentId));
+    boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(),
batch);
     
-    BatchCollector fSet = fragCounts.get(batch.getHeader().getSendingMajorFragmentId());
-    if (fSet == null) throw new FragmentSetupException("We received a major fragment id that
we were not expecting.");
-    fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
-    return remainingRequired.get() == 0;
+    // we should only return true if remaining required has been decremented and is currently
equal to zero.
+    return decremented && remainingRequired.get() == 0;
   }
 
   public int getRemainingRequired() {
@@ -75,7 +82,7 @@ public class IncomingBuffers {
    * Designed to setup initial values for arriving fragment accounting.
    */
   public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer,
BatchCollector>, RuntimeException> {
-
+    
     @Override
     public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts)
throws RuntimeException {
       BatchCollector set;
@@ -84,7 +91,7 @@ public class IncomingBuffers {
       } else {
         set = new PartitionedCollector(remainingRequired, receiver);
       }
-
+      
       counts.put(set.getOppositeMajorFragmentId(), set);
       remainingRequired.incrementAndGet();
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index e21d69a..93868a7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -27,7 +27,7 @@ public class MergingCollector extends AbstractFragmentCollector{
   
   public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
     super(parentAccounter, receiver, 1);
-    streamsRunning = new AtomicInteger(parentAccounter.get());
+    streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
   }
 
   @Override
@@ -35,10 +35,11 @@ public class MergingCollector extends AbstractFragmentCollector{
     return buffers[0];
   }
 
-  @Override
+  
   public void streamFinished(int minorFragmentId) {
     if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
   }
+
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 116ca26..25b5884 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -36,6 +36,7 @@ public class PartitionedCollector extends AbstractFragmentCollector{
   public void streamFinished(int minorFragmentId) {
     buffers[minorFragmentId].finished();
   }
+
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
index f97d878..71ae576 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
@@ -65,7 +65,7 @@ public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
       }
     }
     
-    return null;
+    return b;
     
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index dea8282..f86c4fb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -24,12 +24,9 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.ImplCreator;
-import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
@@ -45,8 +42,8 @@ import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.util.AtomicState;
@@ -126,14 +123,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   
   void cleanupAndSendResult(QueryResult result){
     bee.retireForeman(this);
-    initiatingClient.sendResult(new QueryWritableBatch(result)).addLightListener(new ResponseSendListener());
+    initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result));
   }
 
-  private class ResponseSendListener extends RpcOutcomeListener<Ack> {
+  private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
     @Override
     public void failed(RpcException ex) {
-      logger
-          .info(
+      logger.info(
               "Failure while trying communicate query result to initating client.  This would
happen if a client is disconnected before response notice can be sent.",
               ex);
     }
@@ -193,12 +189,17 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       fail("Failure while fragmenting query.", e);
       return;
     }
+    
+    
+
+    
     PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
     SimpleParallelizer parallelizer = new SimpleParallelizer();
 
     try {
       QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId,
context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10);
 
+      this.context.getBitCom().getListeners().addFragmentStatusListener(work.getRootFragment().getHandle(),
fragmentManager);
       List<PlanFragment> leafFragments = Lists.newArrayList();
 
       // store fragments in distributed grid.
@@ -213,7 +214,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient,
leafFragments);
 
     
-    } catch (ExecutionSetupException e) {
+    } catch (ExecutionSetupException | RpcException e) {
       fail("Failure while setting up query.", e);
     }
 
@@ -245,9 +246,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     return this.state.getState();
   }
 
-  public boolean rootCoorespondsTo(FragmentHandle handle){
-    throw new UnsupportedOperationException();
-  }
   
   class ForemanManagerListener{
     void fail(String message, Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 20797b8..f069db7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -64,12 +64,13 @@ class RunningFragmentManager implements FragmentStatusListener{
     this.foreman = foreman;
     this.tun = tun;
     this.remainingFragmentCount = new AtomicInteger(0);
+    
   }
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator,
UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
     remainingFragmentCount.set(leafFragments.size()+1);
 
-    // set up the root framgnet first so we'll have incoming buffers available.
+    // set up the root fragment first so we'll have incoming buffers available.
     {
       IncomingBuffers buffers = new IncomingBuffers(rootOperator);
       
@@ -97,13 +98,13 @@ class RunningFragmentManager implements FragmentStatusListener{
   private void sendRemoteFragment(PlanFragment fragment){
     map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(),
false));
     FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(),
fragment);
-    tun.get(fragment.getAssignment()).sendFragment(fragment).addLightListener(listener);
+    tun.get(fragment.getAssignment()).sendFragment(listener, fragment);
   }
   
   
   @Override
   public void statusUpdate(FragmentStatus status) {
-    
+    logger.debug("New fragment status was provided to Foreman of {}", status);
     switch(status.getState()){
     case AWAITING_ALLOCATION:
       updateStatus(status);
@@ -205,6 +206,7 @@ class RunningFragmentManager implements FragmentStatusListener{
 
     @Override
     public void failed(RpcException ex) {
+      logger.debug("Failure while sending fragment.  Stopping query.", ex);
       stopQuery();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
index b4e9308..b23f003 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
@@ -44,6 +44,6 @@ public interface IncomingFragmentHandler {
   public abstract FragmentRunner getRunnable();
 
   public abstract void cancel();
-  public boolean isDone();
+  public boolean isWaiting();
   public abstract FragmentHandle getHandle();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
index 3f710ed..5ffd09a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
@@ -60,8 +60,8 @@ public class LocalFragmentHandler implements IncomingFragmentHandler{
   }
 
   @Override
-  public boolean isDone() {
-    return cancel || isDone();
+  public boolean isWaiting() {
+    return !buffers.isDone() && !cancel;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index 70d7e93..4a5dbf2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -113,8 +113,8 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler
{
   }
 
   @Override
-  public boolean isDone() {
-    return cancel || buffers.isDone();
+  public boolean isWaiting() {
+    return !buffers.isDone() && !cancel;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
index 7c6bfe5..586ccf6 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import static org.junit.Assert.*;
+
 import java.util.List;
 
 import org.apache.drill.common.util.FileUtils;
@@ -26,26 +28,29 @@ import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
-@Ignore
+//@Ignore
 public class DistributedFragmentRun extends PopUnitTestBase{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedFragmentRun.class);
   
   
   @Test 
-  public void simpleDistributedQuery() throws Exception{
+  public void oneBitOneExchangeRun() throws Exception{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG,
serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG,
serviceSet.getCoordinator());){
       bit1.run();
-      bit2.run();
       client.connect();
       List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange.json"),
Charsets.UTF_8));
-      System.out.println(results);
+      int count = 0;
+      for(QueryResultBatch b : results){
+        count += b.getHeader().getRowCount();
+      }
+      assertEquals(100, count);
     }
     
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
index 7b7ab8e..1e0c5b6 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
@@ -23,28 +23,12 @@ import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
 
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.fragment.Fragment;
-import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
-import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
-import org.apache.drill.exec.planner.SimpleExecPlanner;
+import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.QueryWorkUnit;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
 public class CheckFragmenter extends PopUnitTestBase {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CheckFragmenter.class);
 
@@ -77,10 +61,11 @@ public class CheckFragmenter extends PopUnitTestBase {
     assertNotNull(b.getSendingExchange());
   }
 
-  
 
 
   
+
+  
   
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
index 6f229a3..e1db639 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
@@ -17,16 +17,15 @@
  ******************************************************************************/
 package org.apache.drill.exec.pop;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -41,26 +40,46 @@ public class FragmentChecker extends PopUnitTestBase{
   
   @Test
   public void checkSimpleExchangePlan() throws Exception{
+    print("/physical_simpleexchange.json", 2, 3);
+
+  }
+  
+  
+  private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws
Exception{
     
+    System.out.println(String.format("=================Building plan fragments for [%s].
 Allowing %d total Drillbits.==================", fragmentFile, bitCount));
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
-    Fragment fragmentRoot = getRootFragment(ppr, "/physical_simpleexchange.json");
+    Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
     PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
     SimpleParallelizer par = new SimpleParallelizer();
+    List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    DrillbitEndpoint localBit = null;
+    for(int i =0; i < bitCount; i++){
+      DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234+i).build();
+      if(i ==0) localBit = b1; 
+      endpoints.add(b1);
+    }
     
-    DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234).build();
-    DrillbitEndpoint b2 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(2345).build();
     
-    QueryWorkUnit qwu = par.getFragments(b1, QueryId.getDefaultInstance(), Lists.newArrayList(b1,
b2), ppr, fragmentRoot, planningSet, 10);
-    assertEquals(qwu.getFragments().size(), 3);
-    System.out.println("=========ROOT FRAGMENT=========");
+    QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints,
ppr, fragmentRoot, planningSet, 10);
+    System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(),
qwu.getRootFragment().getHandle().getMinorFragmentId()));
+    
     System.out.print(qwu.getRootFragment().getFragmentJson());
     
     
     for(PlanFragment f : qwu.getFragments()){
-      System.out.println("=========");
+      System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(),
f.getHandle().getMinorFragmentId()));
       System.out.print(f.getFragmentJson());
     }
+    
+    //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
+
     logger.debug("Planning Set {}", planningSet);
+  }
+  
+  @Test
+  public void validateSingleExchangeFragment() throws Exception{
+    print("/physical_single_exchange.json", 1, 2);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 9684e9f..038b093 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -19,27 +19,25 @@ package org.apache.drill.exec.server;
 
 import io.netty.buffer.ByteBuf;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.bit.BitClient;
-import org.apache.drill.exec.rpc.bit.BitComImpl;
 import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager;
 import org.apache.drill.exec.rpc.bit.BitRpcConfig;
 import org.apache.drill.exec.rpc.bit.BitServer;
+import org.apache.drill.exec.rpc.bit.BitTunnel.SendFragmentStatus;
+import org.apache.drill.exec.rpc.bit.ConnectionManagerRegistry;
 import org.apache.drill.exec.rpc.bit.ListenerPool;
 import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 import org.junit.Test;
 
-import com.google.common.collect.Maps;
-
 public class TestBitRpc {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
   
@@ -47,13 +45,19 @@ public class TestBitRpc {
   public void testBasicConnectionAndHandshake() throws Exception{
     int port = 1234;
     BootStrapContext c = new BootStrapContext(DrillConfig.create());
-    ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
-    BitServer server = new BitServer(new BitComTestHandler(), c, registry, new ListenerPool(2));
+    final BitComTestHandler handler = new BitComTestHandler();
+    final ListenerPool listeners = new ListenerPool(2);
+    ConnectionManagerRegistry registry = new ConnectionManagerRegistry(handler, c, listeners);
+    BitServer server = new BitServer(handler, c, registry, listeners);
     port = server.bind(port);
+    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build();
+    registry.setEndpoint(ep);
     for(int i =0; i < 10; i++){
-      BitClient client = new BitClient(DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build(),
null, new BitComTestHandler(), c, registry, new ListenerPool(2));
-      client.connect();
-      
+      try(BitConnectionManager cm = new BitConnectionManager(ep, ep, handler, c, listeners)){
+        SendFragmentStatus cmd = new SendFragmentStatus(FragmentStatus.getDefaultInstance());
+        cm.runCommand(cmd);
+        cmd.getFuture().checkedGet();
+      }
     }
     System.out.println("connected");
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
index 675ecfb..0e1921e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
@@ -17,7 +17,6 @@
             	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
             	  {name: "green", type: "INT", mode: "REQUIRED"}
             	]}
-            	
             ]
         },
         {


Mime
View raw message