storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [11/23] storm git commit: Log error message for dropping messages only once per connection error (logging it everytime on send was flooding the log).
Date Tue, 14 Jul 2015 19:06:42 GMT
Log error message for dropping messages only once per connection error (logging it everytime
on send was flooding the log).


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad8112d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad8112d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad8112d1

Branch: refs/heads/0.9.x-branch
Commit: ad8112d10d662ae81498d11f78a602b97243a142
Parents: aa5c2d7
Author: Enno Shioji <eshioji@gmail.com>
Authored: Sun May 31 00:54:31 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Sun May 31 00:54:31 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 24 ++++++++++++++++----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ad8112d1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 340d43b..187bba3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -233,7 +233,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         Channel channel = channelRef.get();
         if (!connectionEstablished(channel)) {
             // Closing the channel and reconnecting should be done before handling the messages.
-            closeChannelAndReconnect(channel);
+            boolean reconnectScheduled = closeChannelAndReconnect(channel);
+            if(reconnectScheduled){
+                // Log the connection error only once
+                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+            }
             handleMessagesWhenConnectionIsUnavailable(msgs);
             return;
         }
@@ -267,7 +271,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * succeed  or not, and how long the recovery will take.
      */
     private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs)
{
-        LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
         dropMessages(msgs);
     }
 
@@ -275,7 +278,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-        LOG.error("dropping {} message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {
@@ -318,13 +320,21 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         });
     }
 
-    private void closeChannelAndReconnect(Channel channel) {
+    /**
+     * Schedule a reconnect if we closed a non-null channel, and acquired the right to
+     * provide a replacement
+     * @param channel
+     * @return if the call scheduled a re-connect task
+     */
+    private boolean closeChannelAndReconnect(Channel channel) {
         if (channel != null) {
             channel.close();
             if (channelRef.compareAndSet(channel, null)) {
                 scheduleConnect(NO_DELAY_MS);
+                return true;
             }
         }
+        return false;
     }
 
     private boolean containsMessages(MessageBatch batch) {
@@ -431,6 +441,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
                                 checkState(setChannel);
                                 LOG.debug("successfully connected to {}, {} [attempt {}]",
address.toString(), newChannel.toString(),
                                         connectionAttempt);
+                                if(messagesLost.get() > 0){
+                                    LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
+                                }
                             } else {
                                 Throwable cause = future.getCause();
                                 reschedule(cause);
@@ -443,7 +456,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
                 } else {
                     close();
                     throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
-                            connectionAttempts + " failed attempts");
+                            connectionAttempts + " failed attempts. " + messagesLost.get()
+ " messages were lost");
+
                 }
             } catch (Throwable e) {
                 LOG.error("Uncaught throwable", e);


Mime
View raw message