Repository: drill
Updated Branches:
refs/heads/master e7e9b73c1 -> 9514cbe75
DRILL-3743: Fail active result listeners if server connection is closed
+ Remove dead code
+ Improve error and logging messages
closes #460
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55d54eda
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55d54eda
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55d54eda
Branch: refs/heads/master
Commit: 55d54eda392961d858c171b737c6ce76cf9a9fb1
Parents: e7e9b73
Author: Sudheesh Katkam <skatkam@maprtech.com>
Authored: Tue Apr 5 16:04:44 2016 -0700
Committer: Sudheesh Katkam <skatkam@maprtech.com>
Committed: Wed Apr 6 11:37:36 2016 -0700
----------------------------------------------------------------------
.../drill/exec/rpc/user/QueryResultHandler.java | 126 +++++++++++--------
.../apache/drill/exec/rpc/user/UserClient.java | 7 +-
2 files changed, 78 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/55d54eda/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index ca73ac8..00a324b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
-import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -35,9 +34,10 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
import org.apache.drill.exec.rpc.ConnectionThrottle;
-import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
@@ -69,9 +69,13 @@ public class QueryResultHandler {
private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap
=
Maps.newConcurrentMap();
- public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
- UserResultsListener resultsListener) {
- return new SubmissionListener(connection, resultsListener);
+ public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener)
{
+ return new SubmissionListener(resultsListener);
+ }
+
+ public RpcConnectionHandler<ServerConnection> getWrappedConnectionHandler(
+ final RpcConnectionHandler<ServerConnection> handler) {
+ return new ChannelClosedHandler(handler);
}
/**
@@ -84,7 +88,10 @@ public class QueryResultHandler {
final QueryId queryId = queryResult.getQueryId();
final QueryState queryState = queryResult.getQueryState();
- logger.debug( "resultArrived: queryState: {}, queryId = {}", queryState, queryId );
+ if (logger.isDebugEnabled()) {
+ logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
+ QueryIdHelper.getQueryId(queryId));
+ }
assert queryResult.hasQueryState() : "received query result without QueryState";
@@ -92,9 +99,6 @@ public class QueryResultHandler {
// CANCELED queries are handled the same way as COMPLETED
final boolean isTerminalResult;
switch ( queryState ) {
- case STARTING:
- isTerminalResult = false;
- break;
case FAILED:
case CANCELED:
case COMPLETED:
@@ -154,7 +158,9 @@ public class QueryResultHandler {
final QueryId queryId = queryData.getQueryId();
- logger.debug( "batchArrived: queryId = {}", queryId );
+ if (logger.isDebugEnabled()) {
+ logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
+ }
logger.trace( "batchArrived: batch = {}", batch );
final UserResultsListener resultsListener = newUserResultsListener(queryId);
@@ -189,20 +195,10 @@ public class QueryResultHandler {
if ( null == resultsListener ) {
resultsListener = bl;
}
- // TODO: Is there a more direct way to detect a Query ID in whatever state this string
comparison detects?
- if ( queryId.toString().isEmpty() ) {
- failAll();
- }
}
return resultsListener;
}
- private void failAll() {
- for (UserResultsListener l : queryIdToResultsListenersMap.values()) {
- l.submissionFailed(UserException.systemError(new RpcException("Received result without
QueryId")).build(logger));
- }
- }
-
private static class BufferingResultsListener implements UserResultsListener {
private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
@@ -272,55 +268,41 @@ public class QueryResultHandler {
@Override
public void queryIdArrived(QueryId queryId) {
}
-
}
-
private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+
private final UserResultsListener resultsListener;
- private final RemoteConnection connection;
- private final ChannelFuture closeFuture;
- private final ChannelClosedListener closeListener;
private final AtomicBoolean isTerminal = new AtomicBoolean(false);
- public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener)
{
- super();
+ public SubmissionListener(UserResultsListener resultsListener) {
this.resultsListener = resultsListener;
- this.connection = connection;
- this.closeFuture = connection.getChannel().closeFuture();
- this.closeListener = new ChannelClosedListener();
- closeFuture.addListener(closeListener);
- }
-
- private class ChannelClosedListener implements GenericFutureListener<Future<Void>>
{
-
- @Override
- public void operationComplete(Future<Void> future) throws Exception {
- resultsListener.submissionFailed(UserException.connectionError()
- .message("Connection %s closed unexpectedly.", connection.getName())
- .build(logger));
- }
-
}
@Override
public void failed(RpcException ex) {
if (!isTerminal.compareAndSet(false, true)) {
+ logger.warn("Received multiple responses to run query request.");
return;
}
- closeFuture.removeListener(closeListener);
- resultsListener.submissionFailed(UserException.systemError(ex).build(logger));
-
+ // Although query submission failed, results might have arrived for this query.
+ // However, the results could not be transferred to this resultListener because
+ // there is no query id mapped to this resultListener. Look out for the warning
+ // message from ChannelClosedHandler in the client logs.
+ // TODO(DRILL-4586)
+ resultsListener.submissionFailed(UserException.systemError(ex)
+ .addContext("Query submission to Drillbit failed.")
+ .build(logger));
}
@Override
public void success(QueryId queryId, ByteBuf buf) {
if (!isTerminal.compareAndSet(false, true)) {
+ logger.warn("Received multiple responses to run query request.");
return;
}
- closeFuture.removeListener(closeListener);
resultsListener.queryIdArrived(queryId);
if (logger.isDebugEnabled()) {
logger.debug("Received QueryId {} successfully. Adding results listener {}.",
@@ -354,17 +336,57 @@ public class QueryResultHandler {
@Override
public void interrupted(final InterruptedException ex) {
- logger.warn("Interrupted while waiting for query results from Drillbit", ex);
-
if (!isTerminal.compareAndSet(false, true)) {
+ logger.warn("Received multiple responses to run query request.");
return;
}
- closeFuture.removeListener(closeListener);
-
- // Throw an interrupted UserException?
- resultsListener.submissionFailed(UserException.systemError(ex).build(logger));
+ // TODO(DRILL-4586)
+ resultsListener.submissionFailed(UserException.systemError(ex)
+ .addContext("The client had been asked to wait as the Drillbit is potentially being
over-utilized." +
+ " But the client was interrupted while waiting.")
+ .build(logger));
}
}
+ /**
+ * When a {@link ServerConnection connection} to a server is successfully created, this
handler adds a
+ * listener to that connection that listens to connection closure. If the connection is
closed, all active
+ * {@link UserResultsListener result listeners} are failed.
+ */
+ private class ChannelClosedHandler implements RpcConnectionHandler<ServerConnection>
{
+
+ private final RpcConnectionHandler<ServerConnection> parentHandler;
+
+ public ChannelClosedHandler(final RpcConnectionHandler<ServerConnection> parentHandler)
{
+ this.parentHandler = parentHandler;
+ }
+
+ @Override
+ public void connectionSucceeded(final ServerConnection connection) {
+ connection.getChannel().closeFuture().addListener(
+ new GenericFutureListener<Future<? super Void>>() {
+ @Override
+ public void operationComplete(Future<? super Void> future)
+ throws Exception {
+ for (final UserResultsListener listener : queryIdToResultsListenersMap.values())
{
+ listener.submissionFailed(UserException.connectionError()
+ .message("Connection %s closed unexpectedly. Drillbit down?",
+ connection.getName())
+ .build(logger));
+ if (listener instanceof BufferingResultsListener) {
+ // the appropriate listener will be failed by SubmissionListener#failed
+ logger.warn("Buffering listener failed before results were transferred
to the actual listener.");
+ }
+ }
+ }
+ });
+ parentHandler.connectionSucceeded(connection);
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ parentHandler.connectionFailed(type, t);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/55d54eda/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 824e6eb..5ff6a6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -67,11 +67,11 @@ public class UserClient extends BasicClientWithConnection<RpcType,
UserToBitHand
}
public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
- send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY,
query, QueryId.class);
+ send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query,
QueryId.class);
}
public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint
endpoint,
- UserProperties props, UserBitShared.UserCredentials credentials) {
+ UserProperties props, UserBitShared.UserCredentials credentials) {
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
@@ -83,7 +83,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
hsBuilder.setProperties(props);
}
- this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
+ this.connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler),
+ hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
}
@Override
|