storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [20/23] storm git commit: Bring back graceful shutdown
Date Tue, 14 Jul 2015 19:06:51 GMT
Bring back graceful shutdown


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6d6c260d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6d6c260d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6d6c260d

Branch: refs/heads/0.9.x-branch
Commit: 6d6c260da3663f1ab1dfee6c9333fcdc278e9b46
Parents: 5e13497
Author: Enno Shioji <eshioji@gmail.com>
Authored: Thu Jun 4 16:01:56 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Thu Jun 4 16:04:03 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 26 +++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6d6c260d/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index c779733..166c1b2 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -61,6 +61,8 @@ import static com.google.common.base.Preconditions.checkState;
  *       the remote destination is currently unavailable.
  */
 public class Client extends ConnectionWithStatus implements IStatefulObject {
+    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
 
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
@@ -357,11 +359,33 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
-
+            waitForPendingMessagesToBeSent();
             closeChannel();
         }
     }
 
+    private void waitForPendingMessagesToBeSent() {
+        LOG.info("waiting up to {} ms to send {} pending messages to {}",
+                PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+        long totalPendingMsgs = pendingMessages.get();
+        long startMs = System.currentTimeMillis();
+        while (pendingMessages.get() != 0) {
+            try {
+                long deltaMs = System.currentTimeMillis() - startMs;
+                if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
+                    LOG.error("failed to send all pending messages to {} within timeout,
{} of {} messages were not " +
+                            "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                    break;
+                }
+                Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
+            }
+            catch (InterruptedException e) {
+                break;
+            }
+        }
+
+    }
+
 
     private void closeChannel() {
         Channel channel = channelRef.get();


Mime
View raw message