drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [1/4] drill git commit: DRILL-3743: Fail active result listeners if server connection is closed
Date Thu, 07 Apr 2016 01:07:13 GMT
Repository: drill
Updated Branches:
  refs/heads/master e7e9b73c1 -> 9514cbe75


DRILL-3743: Fail active result listeners if server connection is closed

+ Remove dead code
+ Improve error and logging messages

closes #460


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/55d54eda
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/55d54eda
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/55d54eda

Branch: refs/heads/master
Commit: 55d54eda392961d858c171b737c6ce76cf9a9fb1
Parents: e7e9b73
Author: Sudheesh Katkam <skatkam@maprtech.com>
Authored: Tue Apr 5 16:04:44 2016 -0700
Committer: Sudheesh Katkam <skatkam@maprtech.com>
Committed: Wed Apr 6 11:37:36 2016 -0700

----------------------------------------------------------------------
 .../drill/exec/rpc/user/QueryResultHandler.java | 126 +++++++++++--------
 .../apache/drill/exec/rpc/user/UserClient.java  |   7 +-
 2 files changed, 78 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/55d54eda/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index ca73ac8..00a324b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.rpc.user;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
@@ -35,9 +34,10 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
 import org.apache.drill.exec.rpc.ConnectionThrottle;
-import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
@@ -69,9 +69,13 @@ public class QueryResultHandler {
   private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap
=
       Maps.newConcurrentMap();
 
-  public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
-      UserResultsListener resultsListener) {
-    return new SubmissionListener(connection, resultsListener);
+  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener)
{
+    return new SubmissionListener(resultsListener);
+  }
+
+  public RpcConnectionHandler<ServerConnection> getWrappedConnectionHandler(
+      final RpcConnectionHandler<ServerConnection> handler) {
+    return new ChannelClosedHandler(handler);
   }
 
   /**
@@ -84,7 +88,10 @@ public class QueryResultHandler {
     final QueryId queryId = queryResult.getQueryId();
     final QueryState queryState = queryResult.getQueryState();
 
-    logger.debug( "resultArrived: queryState: {}, queryId = {}", queryState, queryId );
+    if (logger.isDebugEnabled()) {
+      logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
+          QueryIdHelper.getQueryId(queryId));
+    }
 
     assert queryResult.hasQueryState() : "received query result without QueryState";
 
@@ -92,9 +99,6 @@ public class QueryResultHandler {
     // CANCELED queries are handled the same way as COMPLETED
     final boolean isTerminalResult;
     switch ( queryState ) {
-      case STARTING:
-        isTerminalResult = false;
-        break;
       case FAILED:
       case CANCELED:
       case COMPLETED:
@@ -154,7 +158,9 @@ public class QueryResultHandler {
 
     final QueryId queryId = queryData.getQueryId();
 
-    logger.debug( "batchArrived: queryId = {}", queryId );
+    if (logger.isDebugEnabled()) {
+      logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
+    }
     logger.trace( "batchArrived: batch = {}", batch );
 
     final UserResultsListener resultsListener = newUserResultsListener(queryId);
@@ -189,20 +195,10 @@ public class QueryResultHandler {
       if ( null == resultsListener ) {
         resultsListener = bl;
       }
-      // TODO:  Is there a more direct way to detect a Query ID in whatever state this string
comparison detects?
-      if ( queryId.toString().isEmpty() ) {
-        failAll();
-      }
     }
     return resultsListener;
   }
 
-  private void failAll() {
-    for (UserResultsListener l : queryIdToResultsListenersMap.values()) {
-      l.submissionFailed(UserException.systemError(new RpcException("Received result without
QueryId")).build(logger));
-    }
-  }
-
   private static class BufferingResultsListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
@@ -272,55 +268,41 @@ public class QueryResultHandler {
     @Override
     public void queryIdArrived(QueryId queryId) {
     }
-
   }
 
-
   private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+
     private final UserResultsListener resultsListener;
-    private final RemoteConnection connection;
-    private final ChannelFuture closeFuture;
-    private final ChannelClosedListener closeListener;
     private final AtomicBoolean isTerminal = new AtomicBoolean(false);
 
-    public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener)
{
-      super();
+    public SubmissionListener(UserResultsListener resultsListener) {
       this.resultsListener = resultsListener;
-      this.connection = connection;
-      this.closeFuture = connection.getChannel().closeFuture();
-      this.closeListener = new ChannelClosedListener();
-      closeFuture.addListener(closeListener);
-    }
-
-    private class ChannelClosedListener implements GenericFutureListener<Future<Void>>
{
-
-      @Override
-      public void operationComplete(Future<Void> future) throws Exception {
-        resultsListener.submissionFailed(UserException.connectionError()
-            .message("Connection %s closed unexpectedly.", connection.getName())
-            .build(logger));
-      }
-
     }
 
     @Override
     public void failed(RpcException ex) {
       if (!isTerminal.compareAndSet(false, true)) {
+        logger.warn("Received multiple responses to run query request.");
         return;
       }
 
-      closeFuture.removeListener(closeListener);
-      resultsListener.submissionFailed(UserException.systemError(ex).build(logger));
-
+      // Although query submission failed, results might have arrived for this query.
+      // However, the results could not be transferred to this resultListener because
+      // there is no query id mapped to this resultListener. Look out for the warning
+      // message from ChannelClosedHandler in the client logs.
+      // TODO(DRILL-4586)
+      resultsListener.submissionFailed(UserException.systemError(ex)
+          .addContext("Query submission to Drillbit failed.")
+          .build(logger));
     }
 
     @Override
     public void success(QueryId queryId, ByteBuf buf) {
       if (!isTerminal.compareAndSet(false, true)) {
+        logger.warn("Received multiple responses to run query request.");
         return;
       }
 
-      closeFuture.removeListener(closeListener);
       resultsListener.queryIdArrived(queryId);
       if (logger.isDebugEnabled()) {
         logger.debug("Received QueryId {} successfully. Adding results listener {}.",
@@ -354,17 +336,57 @@ public class QueryResultHandler {
 
     @Override
     public void interrupted(final InterruptedException ex) {
-      logger.warn("Interrupted while waiting for query results from Drillbit", ex);
-
       if (!isTerminal.compareAndSet(false, true)) {
+        logger.warn("Received multiple responses to run query request.");
         return;
       }
 
-      closeFuture.removeListener(closeListener);
-
-      // Throw an interrupted UserException?
-      resultsListener.submissionFailed(UserException.systemError(ex).build(logger));
+      // TODO(DRILL-4586)
+      resultsListener.submissionFailed(UserException.systemError(ex)
+          .addContext("The client had been asked to wait as the Drillbit is potentially being
over-utilized." +
+              " But the client was interrupted while waiting.")
+          .build(logger));
     }
   }
 
+  /**
+   * When a {@link ServerConnection connection} to a server is successfully created, this
handler adds a
+   * listener to that connection that listens to connection closure. If the connection is
closed, all active
+   * {@link UserResultsListener result listeners} are failed.
+   */
+  private class ChannelClosedHandler implements RpcConnectionHandler<ServerConnection>
{
+
+    private final RpcConnectionHandler<ServerConnection> parentHandler;
+
+    public ChannelClosedHandler(final RpcConnectionHandler<ServerConnection> parentHandler)
{
+      this.parentHandler = parentHandler;
+    }
+
+    @Override
+    public void connectionSucceeded(final ServerConnection connection) {
+      connection.getChannel().closeFuture().addListener(
+          new GenericFutureListener<Future<? super Void>>() {
+            @Override
+            public void operationComplete(Future<? super Void> future)
+                throws Exception {
+              for (final UserResultsListener listener : queryIdToResultsListenersMap.values())
{
+                listener.submissionFailed(UserException.connectionError()
+                    .message("Connection %s closed unexpectedly. Drillbit down?",
+                        connection.getName())
+                    .build(logger));
+                if (listener instanceof BufferingResultsListener) {
+                  // the appropriate listener will be failed by SubmissionListener#failed
+                  logger.warn("Buffering listener failed before results were transferred
to the actual listener.");
+                }
+              }
+            }
+          });
+      parentHandler.connectionSucceeded(connection);
+    }
+
+    @Override
+    public void connectionFailed(FailureType type, Throwable t) {
+      parentHandler.connectionFailed(type, t);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/55d54eda/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 824e6eb..5ff6a6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -67,11 +67,11 @@ public class UserClient extends BasicClientWithConnection<RpcType,
UserToBitHand
   }
 
   public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
-    send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY,
query, QueryId.class);
+    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query,
QueryId.class);
   }
 
   public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint
endpoint,
-      UserProperties props, UserBitShared.UserCredentials credentials) {
+                      UserProperties props, UserBitShared.UserCredentials credentials) {
     UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
         .setSupportListening(true)
@@ -83,7 +83,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
       hsBuilder.setProperties(props);
     }
 
-    this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
+    this.connectAsClient(queryResultHandler.getWrappedConnectionHandler(handler),
+        hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort());
   }
 
   @Override


Mime
View raw message