Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b8db98ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b8db98ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b8db98ad
Branch: refs/heads/execwork
Commit: b8db98ad7c159db3cf41a3866ff53013f87964b4
Parents: e57a8d6
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue May 21 18:38:56 2013 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Tue May 21 18:38:56 2013 -0700
----------------------------------------------------------------------
.../drill/common/graph/AdjacencyListBuilder.java | 2 +-
.../org/apache/drill/exec/cache/LocalCache.java | 2 +
.../org/apache/drill/exec/client/DrillClient.java | 77 ++++--
.../drill/exec/coord/LocalClusterCoordinator.java | 7 +-
.../org/apache/drill/exec/ops/FragmentContext.java | 19 ++-
.../org/apache/drill/exec/ops/QueryContext.java | 4 +
.../exec/physical/config/MockRecordReader.java | 1 -
.../drill/exec/physical/config/RandomReceiver.java | 5 -
.../apache/drill/exec/physical/config/Screen.java | 2 +-
.../drill/exec/physical/impl/ScreenCreator.java | 79 +++++-
.../exec/physical/impl/SingleSenderCreator.java | 41 +++-
.../drill/exec/physical/impl/WireRecordBatch.java | 8 +-
.../impl/materialize/QueryWritableBatch.java | 8 +
.../impl/materialize/VectorRecordMaterializer.java | 11 +-
.../drill/exec/planner/fragment/Materializer.java | 8 +-
.../exec/planner/fragment/SimpleParallelizer.java | 4 +-
.../exec/planner/fragment/StatsCollector.java | 2 +-
.../apache/drill/exec/record/RawFragmentBatch.java | 5 +
.../drill/exec/rpc/AbstractHandshakeHandler.java | 5 +-
.../drill/exec/rpc/BaseRpcOutcomeListener.java | 32 +++
.../org/apache/drill/exec/rpc/BasicClient.java | 176 ++++++++------
.../drill/exec/rpc/BasicClientWithConnection.java | 9 +-
.../org/apache/drill/exec/rpc/BasicServer.java | 7 +-
.../rpc/ChannelListenerWithCoordinationId.java | 25 ++
.../apache/drill/exec/rpc/CoordinationQueue.java | 96 +++++++--
.../org/apache/drill/exec/rpc/DrillRpcFuture.java | 2 -
.../apache/drill/exec/rpc/DrillRpcFutureImpl.java | 70 +-----
.../java/org/apache/drill/exec/rpc/RpcBus.java | 88 ++++---
.../apache/drill/exec/rpc/RpcCheckedFuture.java | 33 +++
.../drill/exec/rpc/RpcConnectionHandler.java | 28 +++
.../org/apache/drill/exec/rpc/RpcException.java | 13 +
.../java/org/apache/drill/exec/rpc/RpcOutcome.java | 26 ++
.../apache/drill/exec/rpc/RpcOutcomeListener.java | 7 +-
.../exec/rpc/ZeroCopyProtobufLengthDecoder.java | 2 +-
.../org/apache/drill/exec/rpc/bit/BitClient.java | 52 ++---
.../java/org/apache/drill/exec/rpc/bit/BitCom.java | 8 +-
.../org/apache/drill/exec/rpc/bit/BitComImpl.java | 129 ++---------
.../org/apache/drill/exec/rpc/bit/BitCommand.java | 28 +++
.../apache/drill/exec/rpc/bit/BitConnection.java | 79 +-----
.../drill/exec/rpc/bit/BitConnectionManager.java | 175 +++++++++++---
.../org/apache/drill/exec/rpc/bit/BitServer.java | 60 ++++-
.../org/apache/drill/exec/rpc/bit/BitTunnel.java | 187 ++++-----------
.../exec/rpc/bit/ConnectionManagerRegistry.java | 73 ++++++
.../drill/exec/rpc/bit/FutureBitCommand.java | 78 ++++++
.../apache/drill/exec/rpc/bit/ListenerPool.java | 15 +-
.../drill/exec/rpc/bit/ListeningBitCommand.java | 73 ++++++
.../drill/exec/rpc/user/QueryResultBatch.java | 7 +
.../drill/exec/rpc/user/QueryResultHandler.java | 153 ++++++++++++
.../org/apache/drill/exec/rpc/user/UserClient.java | 153 ++-----------
.../drill/exec/rpc/user/UserResultsListener.java | 11 +-
.../org/apache/drill/exec/rpc/user/UserServer.java | 10 +-
.../apache/drill/exec/server/BootStrapContext.java | 2 +-
.../apache/drill/exec/server/RemoteServiceSet.java | 2 +
.../apache/drill/exec/service/ServiceEngine.java | 6 +-
.../apache/drill/exec/work/EndpointListener.java | 5 +-
.../org/apache/drill/exec/work/FragmentRunner.java | 10 +-
.../org/apache/drill/exec/work/WorkManager.java | 7 +-
.../exec/work/batch/AbstractFragmentCollector.java | 7 +-
.../drill/exec/work/batch/BatchCollector.java | 3 +-
.../drill/exec/work/batch/BitComHandlerImpl.java | 9 +-
.../drill/exec/work/batch/IncomingBuffers.java | 21 +-
.../drill/exec/work/batch/MergingCollector.java | 5 +-
.../exec/work/batch/PartitionedCollector.java | 1 +
.../exec/work/batch/UnlmitedRawBatchBuffer.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 22 +-
.../exec/work/foreman/RunningFragmentManager.java | 8 +-
.../work/fragment/IncomingFragmentHandler.java | 2 +-
.../exec/work/fragment/LocalFragmentHandler.java | 4 +-
.../exec/work/fragment/RemoteFragmentHandler.java | 4 +-
.../exec/physical/impl/DistributedFragmentRun.java | 17 +-
.../org/apache/drill/exec/pop/CheckFragmenter.java | 21 +--
.../org/apache/drill/exec/pop/FragmentChecker.java | 41 +++-
.../org/apache/drill/exec/server/TestBitRpc.java | 26 ++-
.../test/resources/physical_single_exchange.json | 1 -
74 files changed, 1498 insertions(+), 923 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
index 1668477..4a385ce 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
@@ -57,7 +57,7 @@ import java.util.Map;
}
public AdjacencyList<V> getAdjacencyList() {
- logger.debug("Values; {}", ops.values().toArray());
+// logger.debug("Values; {}", ops.values().toArray());
AdjacencyList<V> a = new AdjacencyList<V>();
for (AdjacencyList<V>.Node from : ops.values()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index ddb2a02..b656f2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -43,11 +43,13 @@ public class LocalCache implements DistributedCache {
@Override
public PlanFragment getFragment(FragmentHandle handle) {
+ logger.debug("looking for fragment with handle: {}", handle);
return handles.get(handle);
}
@Override
public void storeFragment(PlanFragment fragment) {
+ logger.debug("Storing fragment: {}", fragment);
handles.put(fragment.getHandle(), fragment);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bb7f77e..c35e834 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -30,22 +30,23 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Vector;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.proto.UserProtos.RpcType;
-import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.rpc.user.UserRpcConfig;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
@@ -75,9 +76,6 @@ public class DrillClient implements Closeable{
}
-
-
-
/**
* Connects the client to a Drillbit server
*
@@ -97,7 +95,9 @@ public class DrillClient implements Closeable{
this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
try {
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
- this.client.connect(endpoint);
+ FutureHandler f = new FutureHandler();
+ this.client.connect(f, endpoint);
+ f.checkedGet();
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -120,34 +120,63 @@ public class DrillClient implements Closeable{
* @throws RpcException
*/
public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
- try {
- ListHoldingResultsListener listener = new ListHoldingResultsListener();
- Future<Void> f = client.submitQuery(newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(), listener);
- f.get();
- if(listener.ex != null){
- throw listener.ex;
- }else{
- return listener.results;
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RpcException(e);
- }
+ ListHoldingResultsListener listener = new ListHoldingResultsListener();
+ client.submitQuery(listener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
+ return listener.getResults();
+
}
- private class ListHoldingResultsListener extends UserResultsListener{
- private RpcException ex;
+ private class ListHoldingResultsListener implements UserResultsListener {
private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+ private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
@Override
public void submissionFailed(RpcException ex) {
logger.debug("Submission failed.", ex);
- this.ex = ex;
+ future.setException(ex);
}
@Override
public void resultArrived(QueryResultBatch result) {
logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
results.add(result);
+ if(result.getHeader().getIsLastChunk()){
+ future.set(results);
+ }
+ }
+
+ public List<QueryResultBatch> getResults() throws RpcException{
+ try{
+ return future.get();
+ }catch(Throwable t){
+ throw RpcException.mapException(t);
+ }
+ }
+ }
+
+ private class FutureHandler extends AbstractCheckedFuture<Void, RpcException> implements RpcConnectionHandler<ServerConnection>, DrillRpcFuture<Void>{
+
+ protected FutureHandler() {
+ super( SettableFuture.<Void>create());
+ }
+
+ @Override
+ public void connectionSucceeded(ServerConnection connection) {
+ getInner().set(null);
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ getInner().setException(new RpcException(String.format("Failure connecting to server. Failure of type %s.", type.name()), t));
+ }
+
+ private SettableFuture<Void> getInner(){
+ return (SettableFuture<Void>) delegate();
+ }
+
+ @Override
+ protected RpcException mapException(Exception e) {
+ return RpcException.mapException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
index 43a5430..f7b3549 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
@@ -29,17 +29,16 @@ import com.google.common.collect.Maps;
public class LocalClusterCoordinator extends ClusterCoordinator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
- private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints;
+ private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
@Override
public void close() throws IOException {
- endpoints = null;
+ endpoints.clear();
}
@Override
public void start(long millis) throws Exception {
logger.debug("Local Cluster Coordinator started.");
- endpoints = Maps.newConcurrentMap();
}
@Override
@@ -52,6 +51,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator{
@Override
public void unregister(RegistrationHandle handle) {
+ if(handle == null) return;
+
endpoints.remove(handle);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e64453c..33707a0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -52,7 +52,9 @@ public class FragmentContext {
private final FragmentHandle handle;
private final UserClientConnection connection;
private final IncomingBuffers buffers;
-
+ private volatile Throwable failureCause;
+ private volatile boolean failed = false;
+
public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers) {
this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
this.batchesCompleted = new SingleThreadNestedCounter(dbContext, METRIC_BATCHES_COMPLETED);
@@ -65,9 +67,10 @@ public class FragmentContext {
}
public void fail(Throwable cause) {
-
+ logger.debug("Fragment Context received failure. {}", cause);
+ failed = true;
+ failureCause = cause;
}
-
public DrillbitContext getDrillbitContext(){
return context;
@@ -107,4 +110,14 @@ public class FragmentContext {
public IncomingBuffers getBuffers(){
return buffers;
}
+
+ public Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ public boolean isFailed(){
+ return failed;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fd24deb..1c251b8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.server.DrillbitContext;
public class QueryContext {
@@ -57,4 +58,7 @@ public class QueryContext {
return drillbitContext.getPlanReader();
}
+ public BitCom getBitCom(){
+ return drillbitContext.getBitCom();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index eaaeaa3..6a1eba4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -76,7 +76,6 @@ public class MockRecordReader implements RecordReader {
int batchRecordCount = 250000 / estimateRowSize;
for (int i = 0; i < config.getTypes().length; i++) {
- logger.debug("Adding field {} of type {}", i, config.getTypes()[i]);
valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
output.addField(i, valueVectors[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
index ed41586..6772fb0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -72,11 +72,6 @@ public class RandomReceiver extends AbstractReceiver{
return new Size(1,1);
}
- @Override
- public int getOppositeMajorFragmentId() {
- return 0;
- }
-
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 86a201d..688c6b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -64,7 +64,7 @@ public class Screen extends AbstractStore implements Root{
// didn't get screwed up.
if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
DrillbitEndpoint endpoint = endpoints.iterator().next();
- logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
+// logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
if (!endpoint.equals(this.endpoint)) {
throw new PhysicalOperatorSetupException(String.format(
"A Screen operator can only be assigned to its home node. Expected endpoint %s, Actual endpoint: %s",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c0711db..c20538d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,21 +17,32 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+import io.netty.buffer.ByteBuf;
+
import java.util.List;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
import com.google.common.base.Preconditions;
public class ScreenCreator implements RootCreator<Screen>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
-
+
+
@Override
public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
Preconditions.checkArgument(children.size() == 1);
@@ -40,7 +51,9 @@ public class ScreenCreator implements RootCreator<Screen>{
private static class ScreenRoot implements RootExec{
-
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
+ volatile boolean ok = true;
+
final RecordBatch incoming;
final FragmentContext context;
final UserClientConnection connection;
@@ -56,25 +69,53 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
public boolean next() {
+ if(!ok){
+ stop();
+ return false;
+ }
+
IterOutcome outcome = incoming.next();
- boolean isLast = false;
+ logger.debug("Screen Outcome {}", outcome);
switch(outcome){
- case NONE:
- case STOP:
- connection.sendResult(materializer.convertNext(true));
- context.batchesCompleted.inc(1);
- context.recordsCompleted.inc(incoming.getRecordCount());
+ case STOP: {
+ QueryResult header1 = QueryResult.newBuilder() //
+ .setQueryId(context.getHandle().getQueryId()) //
+ .setRowCount(0) //
+ .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger))
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ QueryWritableBatch batch1 = new QueryWritableBatch(header1);
+
+ connection.sendResult(listener, batch1);
+ return false;
+ }
+ case NONE: {
+ if(materializer == null){
+ // receive no results.
+ context.batchesCompleted.inc(1);
+ context.recordsCompleted.inc(incoming.getRecordCount());
+ QueryResult header2 = QueryResult.newBuilder() //
+ .setQueryId(context.getHandle().getQueryId()) //
+ .setRowCount(0) //
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ QueryWritableBatch batch2 = new QueryWritableBatch(header2);
+ connection.sendResult(listener, batch2);
+ }else{
+ connection.sendResult(listener, materializer.convertNext(true));
+ }
return false;
-
+ }
case OK_NEW_SCHEMA:
materializer = new VectorRecordMaterializer(context, incoming);
// fall through.
- // fall through
case OK:
- connection.sendResult(materializer.convertNext(false));
context.batchesCompleted.inc(1);
context.recordsCompleted.inc(incoming.getRecordCount());
- return !isLast;
+ connection.sendResult(listener, materializer.convertNext(false));
+ return true;
default:
throw new UnsupportedOperationException();
}
@@ -85,6 +126,20 @@ public class ScreenCreator implements RootCreator<Screen>{
incoming.kill();
}
+ private SendListener listener = new SendListener();
+
+ private class SendListener extends BaseRpcOutcomeListener<Ack>{
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.error("Failure while sending data to user.", ex);
+ ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+ ok = false;
+ }
+
+ }
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 60c2d78..b7d4c7e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -23,9 +23,12 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.bit.BitTunnel;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -45,9 +48,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
private FragmentHandle handle;
private int recMajor;
private FragmentContext context;
+ private volatile boolean ok = true;
public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
- logger.debug("Creating single sender root exec base on config: {}", config);
this.incoming = batch;
this.handle = context.getHandle();
this.recMajor = config.getOppositeMajorFragmentId();
@@ -57,20 +60,24 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public boolean next() {
+ if(!ok){
+ incoming.kill();
+
+ return false;
+ }
IterOutcome out = incoming.next();
logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
case NONE:
- FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
- tunnel.sendRecordBatch(context, b2);
+ FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
return false;
-
case OK:
case OK_NEW_SCHEMA:
- FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
- tunnel.sendRecordBatch(context, batch);
+ FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
return true;
case NOT_YET:
@@ -81,9 +88,31 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public void stop() {
+ ok = false;
}
+ private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
+
+ @Override
+ public void failed(RpcException ex) {
+ context.fail(ex);
+ stop();
+ }
+
+ @Override
+ public void success(Ack value) {
+ if(value.getOk()) return;
+
+ logger.error("Downstream fragment was not accepted. Stopping future sends.");
+ // if we didn't get ack ok, we'll need to kill the query.
+ context.fail(new RpcException("A downstream fragment batch wasn't accepted. This fragment thus fails."));
+ stop();
+ }
+
+ }
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fc7f833..b41b0cd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -38,6 +38,7 @@ public class WireRecordBatch implements RecordBatch{
private RecordBatchLoader batchLoader;
private RawFragmentBatchProvider fragProvider;
private FragmentContext context;
+ private BatchSchema schema;
public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
@@ -53,7 +54,7 @@ public class WireRecordBatch implements RecordBatch{
@Override
public BatchSchema getSchema() {
- return null;
+ return schema;
}
@Override
@@ -73,13 +74,16 @@ public class WireRecordBatch implements RecordBatch{
@Override
public IterOutcome next() {
- RawFragmentBatch batch = this.fragProvider.getNext();
+ RawFragmentBatch batch = fragProvider.getNext();
try{
if(batch == null) return IterOutcome.NONE;
+ logger.debug("Next received batch {}", batch);
+
RecordBatchDef rbd = batch.getHeader().getDef();
boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
if(schemaChanged){
+ this.schema = batchLoader.getSchema();
return IterOutcome.OK_NEW_SCHEMA;
}else{
return IterOutcome.OK;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index 187e6e9..e8ed48a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl.materialize;
+import java.util.Arrays;
+
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
@@ -42,5 +44,11 @@ public class QueryWritableBatch {
public QueryResult getHeader() {
return header;
}
+
+ @Override
+ public String toString() {
+ return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index e2d2eb9..7929296 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.materialize;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
@@ -33,10 +34,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{
public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
this.queryId = context.getHandle().getQueryId();
this.batch = batch;
-
- for (MaterializedField f : batch.getSchema()) {
- logger.debug("New Field: {}", f);
- }
+ BatchSchema schema = batch.getSchema();
+ assert schema != null : "Schema must be defined.";
+
+// for (MaterializedField f : batch.getSchema()) {
+// logger.debug("New Field: {}", f);
+// }
}
public QueryWritableBatch convertNext(boolean isLast) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 9fee586..da71271 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -41,13 +41,13 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
// this is a sending exchange.
PhysicalOperator child = exchange.getChild().accept(this, iNode);
PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child);
- logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
+// logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
return materializedSender;
}else{
// receiving exchange.
PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId());
- logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
+// logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
return materializedReceiver;
}
}
@@ -63,7 +63,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
try {
PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
- logger.debug("New materialized store node {} with child {}", o, child);
+// logger.debug("New materialized store node {} with child {}", o, child);
return o;
} catch (PhysicalOperatorSetupException e) {
throw new FragmentSetupException("Failure while generating a specific Store materialization.");
@@ -72,7 +72,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException {
- logger.debug("Visiting catch all: {}", op);
+// logger.debug("Visiting catch all: {}", op);
List<PhysicalOperator> children = Lists.newArrayList();
for(PhysicalOperator child : op){
children.add(child.accept(this, iNode));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index fc03a23..8adb447 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -145,7 +145,7 @@ public class SimpleParallelizer {
// figure out width.
int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
float diskCost = stats.getDiskCost();
- logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
+// logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
// TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
// of tasks or the maximum width of the fragment.
@@ -154,7 +154,7 @@ public class SimpleParallelizer {
}
if (width < 1) width = 1;
- logger.debug("Setting width {} on fragment {}", width, wrapper);
+// logger.debug("Setting width {} on fragment {}", width, wrapper);
wrapper.setWidth(width);
// figure out endpoint assignments. also informs the exchanges about their respective endpoints.
wrapper.assignEndpoints(allNodes);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index d53a78c..af8ec04 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -41,7 +41,7 @@ public class StatsCollector {
Wrapper wrapper = planningSet.get(n);
n.getRoot().accept(opStatCollector, wrapper);
- logger.debug("Set stats to {}", wrapper.getStats());
+// logger.debug("Set stats to {}", wrapper.getStats());
// receivers...
for (ExchangeFragmentPair child : n) {
// get the fragment node that feeds this node.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index c244cea..4f87224 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -41,4 +41,9 @@ public class RawFragmentBatch {
return body;
}
+ @Override
+ public String toString() {
+ return "RawFragmentBatch [header=" + header + ", body=" + body + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index 859d385..ea591da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -25,8 +25,7 @@ import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
-public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
- ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
protected final EnumLite handshakeType;
@@ -41,7 +40,7 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
@Override
public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
- coordinationId = inbound.coordinationId;
+ this.coordinationId = inbound.coordinationId;
ctx.channel().pipeline().remove(this);
if (inbound.rpcType != handshakeType.getNumber())
throw new RpcException(String.format("Handshake failure. Expected %s[%d] but received number [%d]",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
new file mode 100644
index 0000000..1dab1c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
+
+ @Override
+ public void failed(RpcException ex) {
+ }
+
+ @Override
+ public void success(T value) {
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0ff2b9d..0afc5d0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -29,22 +29,30 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
+
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
-public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection> extends RpcBus<T, R> {
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
+ extends RpcBus<T, R> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
- private Bootstrap b;
+ private final Bootstrap b;
private volatile boolean connect = false;
protected R connection;
- private EventLoopGroup eventLoop;
+ private final T handshakeType;
+ private final Class<HANDSHAKE_RESPONSE> responseClass;
+ private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
- public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+ Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
super(rpcMapping);
- this.eventLoop = eventLoopGroup;
+ this.responseClass = responseClass;
+ this.handshakeType = handshakeType;
+ this.handshakeParser = handshakeParser;
b = new Bootstrap() //
.group(eventLoopGroup) //
@@ -59,12 +67,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
logger.debug("initializing client connection.");
connection = initRemoteConnection(ch);
ch.closeFuture().addListener(getCloseHandler(connection));
-
+
ch.pipeline().addLast( //
new ZeroCopyProtobufLengthDecoder(), //
new RpcDecoder(rpcConfig.getName()), //
new RpcEncoder(rpcConfig.getName()), //
- getHandshakeHandler(), //
+ new ClientHandshakeHandler(), //
new InboundHandler(connection), //
new RpcExceptionHandler() //
);
@@ -75,26 +83,9 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
;
}
- protected abstract ClientHandshakeHandler<?> getHandshakeHandler();
-
- protected abstract class ClientHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
- private Class<T> responseType;
-
- public ClientHandshakeHandler(EnumLite handshakeType, Class<T> responseType, Parser<T> parser) {
- super(handshakeType, parser);
- this.responseType = responseType;
- }
-
- @Override
- protected final void consumeHandshake(Channel c, T msg) throws Exception {
- validateHandshake(msg);
- queue.getFuture(handshakeType.getNumber(), coordinationId, responseType).setValue(msg);
- }
-
- protected abstract void validateHandshake(T msg) throws Exception;
-
- }
-
+ protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
+ protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
+
protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
return new ChannelClosedHandler();
}
@@ -105,6 +96,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
"This shouldn't be used in client mode as a client only has a single connection.");
}
+ protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
+ T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+ super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
+ }
+
protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
@@ -115,65 +111,91 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
return true;
}
- /**
- * TODO: This is a horrible hack to manage deadlock caused by creation of BitClient within BitCom. Should be cleaned up.
- */
- private class HandshakeThread<SEND extends MessageLite, RECEIVE extends MessageLite> extends Thread {
- final SettableFuture<RECEIVE> future;
- T handshakeType;
- SEND handshakeValue;
- String host;
- int port;
- Class<RECEIVE> responseClass;
-
- public HandshakeThread(T handshakeType, SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) {
- super();
- assert host != null && !host.isEmpty();
- assert port > 0;
- logger.debug("Creating new handshake thread to connec to {}:{}", host, port);
- this.setName(String.format("handshake thread for %s", handshakeType.getClass().getCanonicalName()));
- future = SettableFuture.create();
- this.handshakeType = handshakeType;
+ protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+ ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
+ b.connect(host, port).addListener(cml.connectionHandler);
+ }
+
+ private class ConnectionMultiListener {
+ private final RpcConnectionHandler<R> l;
+ private final HANDSHAKE_SEND handshakeValue;
+
+ public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) {
+ assert l != null;
+ assert handshakeValue != null;
+
+ this.l = l;
this.handshakeValue = handshakeValue;
- this.host = host;
- this.port = port;
- this.responseClass = responseClass;
}
- @Override
- public void run() {
- try {
- logger.debug("Starting to get client connection on host {}, port {}.", host, port);
-
- ChannelFuture f = b.connect(host, port);
- f.sync();
- if (connection == null) throw new RpcException("Failure while attempting to connect to server.");
- connect = !connect;
- logger.debug("Client connected, sending handshake.");
- DrillRpcFuture<RECEIVE> fut = send(handshakeType, handshakeValue, responseClass);
- future.set(fut.checkedGet());
- logger.debug("Got bit client connection.");
- } catch (Exception e) {
- logger.debug("Failed to get client connection.", e);
- future.setException(e);
+ public final ConnectionHandler connectionHandler = new ConnectionHandler();
+ public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
+
+ /**
+ * Manages connection establishment outcomes.
+ */
+ private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
+ try {
+ future.get();
+ if (future.isSuccess()) {
+ send(handshakeSendHandler, handshakeType, handshakeValue, responseClass);
+ } else {
+ l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+ }
+// logger.debug("Handshake queued for send.");
+ } catch (Exception ex) {
+ l.connectionFailed(FailureType.CONNECTION, ex);
+ }
}
}
+ /**
+ * manages handshake outcomes.
+ */
+ private class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.debug("Failure while initiating handshake", ex);
+ l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+ }
+
+ @Override
+ public void success(HANDSHAKE_RESPONSE value) {
+// logger.debug("Handshake received. {}", value);
+ try {
+ BasicClient.this.validateHandshake(value);
+ BasicClient.this.finalizeConnection(value, connection);
+ BasicClient.this.connect = true;
+ l.connectionSucceeded(connection);
+// logger.debug("Handshake completed succesfully.");
+ } catch (RpcException ex) {
+ l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
+ }
+ }
+
+ }
+
}
- protected <SEND extends MessageLite, RECEIVE extends MessageLite> RECEIVE connectAsClient(T handshakeType,
- SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) throws InterruptedException,
- RpcException {
-
-
- HandshakeThread<SEND, RECEIVE> ht = new HandshakeThread<SEND, RECEIVE>(handshakeType, handshakeValue, host, port, responseClass);
- ht.start();
- try{
- return ht.future.get();
- }catch(Exception e){
- throw new RpcException(e);
+ private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
+
+ public ClientHandshakeHandler() {
+ super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
}
-
+
+ @Override
+ protected final void consumeHandshake(Channel c, HANDSHAKE_RESPONSE msg) throws Exception {
+ // remove the handshake information from the queue so it doesn't sit there forever.
+ RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(), coordinationId,
+ responseClass);
+ response.set(msg);
+ }
+
}
public void close() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 0e62f14..2028db6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -26,13 +26,16 @@ import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
import com.google.protobuf.Internal.EnumLite;
-public abstract class BasicClientWithConnection<T extends EnumLite> extends BasicClient<T, ServerConnection>{
+public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends BasicClient<T, ServerConnection, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
- public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(rpcMapping, alloc, eventLoopGroup);
+ public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+ Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+ super(rpcMapping, alloc, eventLoopGroup, handshakeType, responseClass, handshakeParser);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 52bb0a2..af5d9c9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -72,7 +72,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
new ZeroCopyProtobufLengthDecoder(), //
new RpcDecoder(rpcConfig.getName()), //
new RpcEncoder(rpcConfig.getName()), //
- getHandshakeHandler(),
+ getHandshakeHandler(connection),
new InboundHandler(connection), //
new RpcExceptionHandler() //
);
@@ -88,7 +88,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
}
- protected abstract ServerHandshakeHandler<?> getHandshakeHandler();
+ protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
@@ -104,9 +104,6 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
new file mode 100644
index 0000000..27e9dee
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+public interface ChannelListenerWithCoordinationId extends GenericFutureListener<ChannelFuture>{
+ public int getCoordinationId();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 70142bb..9edbe11 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -17,8 +17,12 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
@@ -29,31 +33,93 @@ public class CoordinationQueue {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
- private final Map<Integer, DrillRpcFutureImpl<?>> map;
+ private final Map<Integer, RpcOutcome<?>> map;
public CoordinationQueue(int segmentSize, int segmentCount) {
- map = new ConcurrentHashMap<Integer, DrillRpcFutureImpl<?>>(segmentSize, 0.75f, segmentCount);
+ map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount);
}
- void channelClosed(Exception ex) {
- for (DrillRpcFutureImpl<?> f : map.values()) {
- f.setException(ex);
+ void channelClosed(Throwable ex) {
+ if(ex != null){
+ RpcException e;
+ if(ex instanceof RpcException){
+ e = (RpcException) ex;
+ }else{
+ e = new RpcException(ex);
+ }
+ for (RpcOutcome<?> f : map.values()) {
+ f.setException(e);
+ }
}
}
- public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+ public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz){
int i = circularInt.getNext();
- DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
- // logger.debug("Writing to map coord {}, future {}", i, future);
+ RpcListener<V> future = new RpcListener<V>(handler, clazz, i);
Object old = map.put(i, future);
if (old != null)
throw new IllegalStateException(
"You attempted to reuse a coordination id when the previous coordination id has not been removed. This is likely rpc future callback memory leak.");
return future;
}
+
+ private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>{
+ final RpcOutcomeListener<T> handler;
+ final Class<T> clazz;
+ final int coordinationId;
+
+ public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId) {
+ super();
+ this.handler = handler;
+ this.clazz = clazz;
+ this.coordinationId = coordinationId;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(!future.isSuccess()){
+ removeFromMap(coordinationId);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void set(Object value) {
+ assert clazz.isAssignableFrom(value.getClass());
+ handler.success( (T) value);
+ }
+
+ @Override
+ public void setException(Throwable t) {
+ handler.failed(RpcException.mapException(t));
+ }
+
+ @Override
+ public Class<T> getOutcomeType() {
+ return clazz;
+ }
+
+ @Override
+ public int getCoordinationId() {
+ return coordinationId;
+ }
+
+
+ }
+//
+// public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+// int i = circularInt.getNext();
+// DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
+// // logger.debug("Writing to map coord {}, future {}", i, future);
+// Object old = map.put(i, future);
+// if (old != null)
+// throw new IllegalStateException(
+// "You attempted to reuse a coordination id when the previous coordination id has not been removed. This is likely rpc future callback memory leak.");
+// return future;
+// }
- private DrillRpcFutureImpl<?> removeFromMap(int coordinationId) {
- DrillRpcFutureImpl<?> rpc = map.remove(coordinationId);
+ private RpcOutcome<?> removeFromMap(int coordinationId) {
+ RpcOutcome<?> rpc = map.remove(coordinationId);
if (rpc == null) {
logger.error("Rpc is null.");
throw new IllegalStateException(
@@ -62,11 +128,11 @@ public class CoordinationQueue {
return rpc;
}
- public <V> DrillRpcFutureImpl<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
+ public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
// logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
- DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+ RpcOutcome<?> rpc = removeFromMap(coordinationId);
// logger.debug("Got rpc from map {}", rpc);
- Class<?> outcomeClass = rpc.getOutcomeClass();
+ Class<?> outcomeClass = rpc.getOutcomeType();
if (outcomeClass != clazz) {
@@ -80,7 +146,7 @@ public class CoordinationQueue {
}
@SuppressWarnings("unchecked")
- DrillRpcFutureImpl<V> crpc = (DrillRpcFutureImpl<V>) rpc;
+ RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
// logger.debug("Returning casted future");
return crpc;
@@ -88,7 +154,7 @@ public class CoordinationQueue {
public void updateFailedFuture(int coordinationId, RpcFailure failure) {
// logger.debug("Updating failed future.");
- DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+ RpcOutcome<?> rpc = removeFromMap(coordinationId);
rpc.setException(new RemoteRpcException(failure));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index bae947a..9033ea1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -21,6 +21,4 @@ import com.google.common.util.concurrent.CheckedFuture;
public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
-
- public void addLightListener(RpcOutcomeListener<T> outcomeListener);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index ee14eeb..d5d3a9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -22,22 +22,12 @@ import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.AbstractCheckedFuture;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>{
+class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>, RpcOutcomeListener<V>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
- final int coordinationId;
- private final Class<V> clazz;
-
- public DrillRpcFutureImpl(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
- super(delegate);
- this.coordinationId = coordinationId;
- this.clazz = clazz;
- }
-
- public Class<V> getOutcomeClass(){
- return clazz;
+ public DrillRpcFutureImpl() {
+ super(new InnerFuture<V>());
}
/**
@@ -53,24 +43,7 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
@Override
protected RpcException mapException(Exception ex) {
- Throwable e = ex;
- while(e instanceof ExecutionException){
- e = e.getCause();
- }
- if (e instanceof RpcException) return (RpcException) e;
-
- return new RpcException(ex);
-
- }
-
- @SuppressWarnings("unchecked")
- void setValue(Object value) {
- assert clazz.isAssignableFrom(value.getClass());
- ((InnerFuture<V>) super.delegate()).setValue((V) value);
- }
-
- boolean setException(Throwable t) {
- return ((InnerFuture<V>) super.delegate()).setException(t);
+ return RpcException.mapException(ex);
}
public static class InnerFuture<T> extends AbstractFuture<T> {
@@ -85,34 +58,17 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
}
}
- public class RpcOutcomeListenerWrapper implements Runnable{
- final RpcOutcomeListener<V> inner;
-
- public RpcOutcomeListenerWrapper(RpcOutcomeListener<V> inner) {
- super();
- this.inner = inner;
- }
-
- @Override
- public void run() {
- try{
- inner.success(DrillRpcFutureImpl.this.checkedGet());
- }catch(RpcException e){
- inner.failed(e);
- }
- }
- }
-
- public void addLightListener(RpcOutcomeListener<V> outcomeListener){
- this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+ @Override
+ public void failed(RpcException ex) {
+ ( (InnerFuture<V>)delegate()).setException(ex);
}
-
-
-
- public static <V> DrillRpcFutureImpl<V> getNewFuture(int coordinationId, Class<V> clazz) {
- InnerFuture<V> f = new InnerFuture<V>();
- return new DrillRpcFutureImpl<V>(f, coordinationId, clazz);
+
+ @Override
+ public void success(V value) {
+ ( (InnerFuture<V>)delegate()).setValue(value);
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 11764db..a680a97 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -64,6 +64,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+ DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
+ this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
+ return rpcFuture;
+ }
+
+ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+
+
+
assert !Arrays.asList(dataBodies).contains(null);
assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
@@ -72,14 +82,12 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
boolean completed = false;
try {
- // logger.debug("Seding message");
Preconditions.checkNotNull(protobufBody);
- DrillRpcFutureImpl<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
- OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBodies);
+ ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
+ OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
ChannelFuture channelFuture = connection.getChannel().write(m);
- channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
+ channelFuture.addListener(futureListener);
completed = true;
- return rpcFuture;
} finally {
if (!completed) {
if (pBuffer != null) pBuffer.release();
@@ -140,10 +148,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
case RESPONSE:
MessageLite m = getResponseDefaultInstance(msg.rpcType);
assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
- DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+ RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
Parser<?> parser = m.getParserForType();
Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- rpcFuture.setValue(value);
+ rpcFuture.set(value);
if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
break;
@@ -162,39 +170,39 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
- private class Listener implements GenericFutureListener<ChannelFuture> {
-
- private int coordinationId;
- private Class<?> clazz;
-
- public Listener(int coordinationId, Class<?> clazz) {
- this.coordinationId = coordinationId;
- this.clazz = clazz;
- }
-
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- // logger.debug("Completed channel write.");
-
- if (channelFuture.isCancelled()) {
- DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
- rpcFuture.setException(new CancellationException("Socket operation was canceled."));
- } else if (!channelFuture.isSuccess()) {
- try {
- channelFuture.get();
- throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
- } catch (Exception e) {
- logger.error("Error occurred during Rpc", e);
- DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
- rpcFuture.setException(e);
- }
- } else {
- // send was successful. No need to modify DrillRpcFuture.
- return;
- }
- }
-
- }
+// private class Listener implements GenericFutureListener<ChannelFuture> {
+//
+// private int coordinationId;
+// private Class<?> clazz;
+//
+// public Listener(int coordinationId, Class<?> clazz) {
+// this.coordinationId = coordinationId;
+// this.clazz = clazz;
+// }
+//
+// @Override
+// public void operationComplete(ChannelFuture channelFuture) throws Exception {
+// // logger.debug("Completed channel write.");
+//
+// if (channelFuture.isCancelled()) {
+// RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+// rpcFuture.setException(new CancellationException("Socket operation was canceled."));
+// } else if (!channelFuture.isSuccess()) {
+// try {
+// channelFuture.get();
+// throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
+// } catch (Exception e) {
+// logger.error("Error occurred during Rpc", e);
+// RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+// rpcFuture.setException(e);
+// }
+// } else {
+// // send was successful. No need to modify DrillRpcFuture.
+// return;
+// }
+// }
+//
+// }
public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
try {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
new file mode 100644
index 0000000..7c300d3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
+ public RpcCheckedFuture(ListenableFuture<T> delegate) {
+ super(delegate);
+ }
+
+ @Override
+ protected RpcException mapException(Exception e) {
+ return RpcException.mapException(e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
new file mode 100644
index 0000000..0f55488
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcConnectionHandler<T extends RemoteConnection> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
+
+ public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
+
+ public void connectionSucceeded(T connection);
+ public void connectionFailed(FailureType type, Throwable t);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index ca66481..500f959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
+import java.util.concurrent.ExecutionException;
+
import org.apache.drill.common.exceptions.DrillIOException;
/**
@@ -41,5 +43,16 @@ public class RpcException extends DrillIOException{
super(cause);
}
+ public static RpcException mapException(Throwable t){
+ while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+ if(t instanceof RpcException) return ((RpcException) t);
+ return new RpcException(t);
+ }
+
+ public static RpcException mapException(String message, Throwable t){
+ while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+ return new RpcException(message, t);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
new file mode 100644
index 0000000..a25e5e7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcOutcome<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class);
+
+ public void set(Object value);
+ public void setException(Throwable t);
+ public Class<T> getOutcomeType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index fac908c..771edcf 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -17,11 +17,10 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
-public abstract class RpcOutcomeListener<V> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcomeListener.class);
+public interface RpcOutcomeListener<V> {
- public void failed(RpcException ex){};
- public void success(V value){};
+ public void failed(RpcException ex);
+ public void success(V value);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 20a7d7d..318abb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -36,7 +36,7 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
if(!ctx.channel().isOpen()){
- logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+ if(in.readableBytes() > 0) logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
in.skipBytes(in.readableBytes());
return;
}
|