storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [10/23] storm git commit: Do not block in Connector. This task runs on a single (by default) thread that is shared among all Clients. If the task blocks, other reconnection requests can't be processed, resulting in a lot of messages being dropped. By not
Date Tue, 14 Jul 2015 19:06:41 GMT
Do not block in Connector. This task runs on a single (by default) thread that is shared among
all Clients. If the task blocks, other reconnection requests can't be processed, resulting
in a lot of messages being dropped. By not blocking, the thread should be able to service
reconnection requests a lot quicker.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aa5c2d71
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aa5c2d71
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aa5c2d71

Branch: refs/heads/0.9.x-branch
Commit: aa5c2d719bb3913285d4274cfcf8364df958b1ff
Parents: 884f496
Author: Enno Shioji <eshioji@gmail.com>
Authored: Fri May 29 14:40:47 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Fri May 29 14:40:47 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 39 ++++++++++++--------
 1 file changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aa5c2d71/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index f2953fc..340d43b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -390,6 +390,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
     /**
      * Asynchronously establishes a Netty connection to the remote address
+     * This task runs on a single (by default) thread shared among all clients, and thus
+     * should not perform operations that block.
      */
     private class Connector implements Runnable {
 
@@ -399,7 +401,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             this.address = address;
         }
 
-        private void reconnectAgain(Throwable t) {
+        private void reschedule(Throwable t) {
             String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
                     dstAddressPrefixedName);
             String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
@@ -413,26 +415,31 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         public void run() {
             try {
                 if (reconnectingAllowed()) {
-                    int connectionAttempt = connectionAttempts.getAndIncrement();
+                    final int connectionAttempt = connectionAttempts.getAndIncrement();
                     totalConnectionAttempts.getAndIncrement();
 
                     LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
                     ChannelFuture future = bootstrap.connect(address);
-                    future.awaitUninterruptibly();
-                    Channel newChannel = future.getChannel();
-
-                    if (future.isSuccess() && connectionEstablished(newChannel))
{
-                        boolean setChannel = channelRef.compareAndSet(null, newChannel);
-                        checkState(setChannel);
-                        LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
newChannel.toString(),
-                                connectionAttempt);
-                    } else {
-                        Throwable cause = future.getCause();
-                        reconnectAgain(cause);
-                        if (newChannel != null) {
-                            newChannel.close();
+                    future.addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception
{
+                            // This call returns immediately
+                            Channel newChannel = future.getChannel();
+
+                            if (future.isSuccess() && connectionEstablished(newChannel))
{
+                                boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                                checkState(setChannel);
+                                LOG.debug("successfully connected to {}, {} [attempt {}]",
address.toString(), newChannel.toString(),
+                                        connectionAttempt);
+                            } else {
+                                Throwable cause = future.getCause();
+                                reschedule(cause);
+                                if (newChannel != null) {
+                                    newChannel.close();
+                                }
+                            }
                         }
-                    }
+                    });
                 } else {
                     close();
                     throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +


Mime
View raw message