storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/23] storm git commit: Simplified the flow and removed the lock that was causing the deadlock
Date Tue, 14 Jul 2015 19:06:32 GMT
Repository: storm
Updated Branches:
  refs/heads/0.9.x-branch f38240342 -> ff42a3deb


Simplified the flow and removed the lock that was causing the deadlock


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ed8ab3ec
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ed8ab3ec
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ed8ab3ec

Branch: refs/heads/0.9.x-branch
Commit: ed8ab3ec194f19c75fc2f5c000609204f04b50e8
Parents: 00091d7
Author: Enno Shioji <eshioji@gmail.com>
Authored: Thu May 28 20:42:05 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Thu May 28 20:42:05 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 376 ++++++-------------
 .../storm/messaging/netty/MessageBatch.java     |   4 +
 2 files changed, 120 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ed8ab3ec/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index afce496..4e97035 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,10 +18,14 @@
 package backtype.storm.messaging.netty;
 
 import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.messaging.TaskMessage;
+import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -29,36 +33,21 @@ import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.*;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.ConnectionWithStatus;
-import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
-import backtype.storm.utils.Utils;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A Netty client for sending task messages to a remote destination (Netty server).
@@ -81,9 +70,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
     private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
-    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
-    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
-    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
 
     private final StormBoundedExponentialBackoffRetry retryPolicy;
     private final ClientBootstrap bootstrap;
@@ -93,7 +79,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     /**
      * The channel used for all write operations from this client to the remote destination.
      */
-    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>(null);
+    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
 
 
     /**
@@ -126,66 +112,51 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      */
     private final AtomicLong pendingMessages = new AtomicLong(0);
 
+
     /**
      * This flag is set to true if and only if a client instance is being closed.
      */
     private volatile boolean closing = false;
 
     /**
-     * When set to true, then the background flusher thread will flush any pending messages
on its next run.
-     */
-    private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false);
-    
-    /**
-     * The absolute time (in ms) when the next background flush should be performed.
-     *
-     * Note: The flush operation will only be performed if backgroundFlushingEnabled is true,
too.
-     */
-    private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong(DISTANT_FUTURE_TIME_MS);
-
-    /**
-     * The time interval (in ms) at which the background flusher thread will be run to check
for any pending messages
-     * to be flushed.
-     */
-    private final int flushCheckIntervalMs;
-    
-    /**
      * How many messages should be batched together before sending them to the remote destination.
      *
      * Messages are batched to optimize network throughput at the expense of latency.
      */
     private final int messageBatchSize;
 
-    private MessageBatch messageBatch = null;
     private final ListeningScheduledExecutorService scheduler;
     protected final Map stormConf;
 
+    private AtomicReference<MessageBatch> pendingMessageBatch;
+
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port) {
         closing = false;
         this.stormConf = stormConf;
-        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
+        this.scheduler = MoreExecutors.listeningDecorator(scheduler);
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port,
bufferSize);
         messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
262144);
-        flushCheckIntervalMs = Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS),
10);
+        int flushCheckIntervalMs = Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS),
10);
 
         maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
+        pendingMessageBatch = new AtomicReference<MessageBatch>(new MessageBatch(messageBatchSize));
+
         // Initiate connection to remote destination
         bootstrap = createClientBootstrap(factory, bufferSize);
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
-        connect(NO_DELAY_MS);
-        
+        scheduleConnect(NO_DELAY_MS);
+
         // Launch background flushing thread
-        pauseBackgroundFlushing();
         long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts);
         scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs,
-            TimeUnit.MILLISECONDS);
+                TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -203,114 +174,34 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         }
         return "";
     }
-        
+
     private Runnable createBackgroundFlusher() {
         return new Runnable() {
             @Override
-            public void run() {   
-                if(!closing && backgroundFlushingEnabled.get() && nowMillis()
> nextBackgroundFlushTimeMs.get()) {
-                    LOG.debug("flushing {} pending messages to {} in background", messageBatch.size(),
-                        dstAddressPrefixedName);
+            public void run() {
+                if (!closing) {
+                    LOG.debug("flushing pending messages to {} in background", dstAddressPrefixedName);
                     flushPendingMessages();
                 }
             }
         };
     }
 
-    private void pauseBackgroundFlushing() {
-        backgroundFlushingEnabled.set(false);
-            }
-        
-    private void resumeBackgroundFlushing() {
-        backgroundFlushingEnabled.set(true);
-    }
-
-    private synchronized void flushPendingMessages() {
-                        Channel channel = channelRef.get();
-        if (containsMessages(messageBatch)) {
-            if (connectionEstablished(channel)) {
-                if (channel.isWritable()) {
-                    pauseBackgroundFlushing();
-                    MessageBatch toBeFlushed = messageBatch;
-                    flushMessages(channel, toBeFlushed);
-                    messageBatch = null;
-                }
-                else if (closing) {
-                    // Ensure background flushing is enabled so that we definitely have a
chance to re-try the flush
-                    // operation in case the client is being gracefully closed (where we
have a brief time window where
-                    // the client will wait for pending messages to be sent).
-                    resumeBackgroundFlushing();
-                }
-                        }
-            else {
-                closeChannelAndReconnect(channel);
-                    }
-                }
-            }
-        
-    private long nowMillis() {
-        return System.currentTimeMillis();
+    private void flushPendingMessages() {
+        Channel channel = channelRef.get();
+        if (connectionEstablished(channel)) {
+            MessageBatch toFlush = pendingMessageBatch.getAndSet(new MessageBatch(messageBatchSize));
+            flushMessages(channel, toFlush);
+        } else {
+            closeChannelAndReconnect(channel);
+        }
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    private synchronized void connect(long delayMs) {
-        try {
-            if (closing) {
-                return;
-            }
-
-            if (connectionEstablished(channelRef.get())) {
-                return;
-            }
-
-            connectionAttempts.getAndIncrement();
-            if (reconnectingAllowed()) {
-                totalConnectionAttempts.getAndIncrement();
-                LOG.info("connection attempt {} to {} scheduled to run in {} ms", connectionAttempts.get(),
-                    dstAddressPrefixedName, delayMs);
-                ListenableFuture<Channel> channelFuture = scheduler.schedule(
-                    new Connector(dstAddress, connectionAttempts.get()), delayMs, TimeUnit.MILLISECONDS);
-                Futures.addCallback(channelFuture, new FutureCallback<Channel>() {
-                    @Override public void onSuccess(Channel result) {
-                        if (connectionEstablished(result)) {
-                            setChannel(result);
-                            LOG.info("connection established to {}", dstAddressPrefixedName);
-                            connectionAttempts.set(0);
-                        }
-                        else {
-                            reconnectAgain(new RuntimeException("Returned channel was actually
not established"));
-                        }
-                    }
-
-                    @Override public void onFailure(Throwable t) {
-                        reconnectAgain(t);
-                    }
-
-                    private void reconnectAgain(Throwable t) {
-                        String baseMsg = String.format("connection attempt %s to %s failed",
connectionAttempts,
-                            dstAddressPrefixedName);
-                        String failureMsg = (t == null)? baseMsg : baseMsg + ": " + t.toString();
-                        LOG.error(failureMsg);
-                        long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(),
0);
-                        connect(nextDelayMs);
-                    }
-                });
-            }
-            else {
-                close();
-                throw new RuntimeException("Giving up to connect to " + dstAddressPrefixedName
+ " after " +
-                    connectionAttempts + " failed attempts");
-            }
-                    }
-        catch (Exception e) {
-            throw new RuntimeException("Failed to connect to " + dstAddressPrefixedName,
e);
-                }
-            }
-
-    private void setChannel(Channel channel) {
-                channelRef.set(channel);
+    private void scheduleConnect(long delayMs) {
+        scheduler.schedule(new Connector(dstAddress), delayMs, TimeUnit.MILLISECONDS);
     }
 
     private boolean reconnectingAllowed() {
@@ -336,11 +227,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     public Status status() {
         if (closing) {
             return Status.Closed;
-        }
-        else if (!connectionEstablished(channelRef.get())) {
+        } else if (!connectionEstablished(channelRef.get())) {
             return Status.Connecting;
-            }
-        else {
+        } else {
             return Status.Ready;
         }
     }
@@ -367,14 +256,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
      */
     @Override
-    public synchronized void send(Iterator<TaskMessage> msgs) {
+    public void send(Iterator<TaskMessage> msgs) {
         if (closing) {
             int numMessages = iteratorSize(msgs);
             LOG.error("discarding {} messages because the Netty client to {} is being closed",
numMessages,
-                dstAddressPrefixedName);
+                    dstAddressPrefixedName);
             return;
         }
-        
+
         if (!hasMessages(msgs)) {
             return;
         }
@@ -387,52 +276,26 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             return;
         }
 
+        MessageBatch toSend = pendingMessageBatch.getAndSet(new MessageBatch(messageBatchSize));
+
         // Collect messages into batches (to optimize network throughput), then flush them.
         while (msgs.hasNext()) {
             TaskMessage message = msgs.next();
-            if (messageBatch == null) {
-                messageBatch = new MessageBatch(messageBatchSize);
-            }
-
-            messageBatch.add(message);
-            if (messageBatch.isFull()) {
-                MessageBatch toBeFlushed = messageBatch;
-                flushMessages(channel, toBeFlushed);
-                messageBatch = null;
+            toSend.add(message);
+            if (toSend.isFull()) {
+                flushMessages(channel, toSend);
+                toSend = new MessageBatch(messageBatchSize);
             }
         }
 
         // Handle any remaining messages in case the "last" batch was not full.
-        if (containsMessages(messageBatch)) {
-            if (connectionEstablished(channel) && channel.isWritable()) {
-                // We can write to the channel, so we flush the remaining messages immediately
to minimize latency.
-                pauseBackgroundFlushing();
-                MessageBatch toBeFlushed = messageBatch;
-                messageBatch = null;
-                flushMessages(channel, toBeFlushed);
-            }
-            else {
-                // We cannot write to the channel, which means Netty's internal write buffer
is full.
-                // In this case, we buffer the remaining messages and wait for the next messages
to arrive.
-                //
-                // Background:
-                // Netty 3.x maintains an internal write buffer with a high water mark for
each channel (default: 64K).
-                // This represents the amount of data waiting to be flushed to operating
system buffers.  If the
-                // outstanding data exceeds this value then the channel is set to non-writable.
 When this happens, a
-                // INTEREST_CHANGED channel event is triggered.  Netty sets the channel to
writable again once the data
-                // has been flushed to the system buffers.
-                //
-                // See http://stackoverflow.com/questions/14049260
-                resumeBackgroundFlushing();
-                nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs);
-            }
-        }
-                
-            }
+        flushMessages(channel, toSend);
+
+    }
 
     private boolean hasMessages(Iterator<TaskMessage> msgs) {
         return msgs != null && msgs.hasNext();
-        }
+    }
 
     /**
      * We will drop pending messages and let at-least-once message replay kick in.
@@ -463,32 +326,28 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         }
         return size;
     }
-    
+
     /**
      * Asynchronously writes the message batch to the channel.
-     * 
+     *
      * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
-    private synchronized void flushMessages(Channel channel, final MessageBatch batch) {
+    private void flushMessages(Channel channel, final MessageBatch batch) {
         if (!containsMessages(batch)) {
             return;
         }
 
         final int numMessages = batch.size();
-        pendingMessages.getAndAdd(numMessages);
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
         ChannelFuture future = channel.write(batch);
         future.addListener(new ChannelFutureListener() {
-
             public void operationComplete(ChannelFuture future) throws Exception {
-                pendingMessages.getAndAdd(0 - numMessages);
                 if (future.isSuccess()) {
                     LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
                     messagesSent.getAndAdd(batch.size());
-                }
-                else {
+                } else {
                     LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
-                        future.getCause());
+                            future.getCause());
                     closeChannelAndReconnect(future.getChannel());
                     messagesLost.getAndAdd(numMessages);
                 }
@@ -496,20 +355,20 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
         });
     }
-            
-    private synchronized void closeChannelAndReconnect(Channel channel) {
-                if (channel != null) {
+
+    private void closeChannelAndReconnect(Channel channel) {
+        if (channel != null) {
             channel.close();
             if (channelRef.compareAndSet(channel, null)) {
-                connect(NO_DELAY_MS);
+                scheduleConnect(NO_DELAY_MS);
             }
-                }
-            }
-        
+        }
+    }
+
     private boolean containsMessages(MessageBatch batch) {
         return batch != null && !batch.isEmpty();
     }
-            
+
     /**
      * Gracefully close this client.
      *
@@ -523,36 +382,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
             flushPendingMessages();
-            waitForPendingMessagesToBeSent();
             closeChannel();
         }
     }
-            
-    private synchronized void waitForPendingMessagesToBeSent() {
-        LOG.info("waiting up to {} ms to send {} pending messages to {}",
-            PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
-        long totalPendingMsgs = pendingMessages.get();
-        long startMs = nowMillis();
-        while (pendingMessages.get() != 0) {
-                try {
-                long deltaMs = nowMillis() - startMs;
-                if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
-                    LOG.error("failed to send all pending messages to {} within timeout,
{} of {} messages were not " +
-                        "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
-                        break;
-                }
-                Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
-                    }
-            catch (InterruptedException e) {
-                    break;
-                } 
-            }
-            
-    }
 
-    private synchronized void closeChannel() {
-        if (channelRef.get() != null) {
-            channelRef.get().close();
+    private void closeChannel() {
+        Channel channel = channelRef.get();
+        if (channel != null) {
+            channel.close();
             LOG.debug("channel to {} closed", dstAddressPrefixedName);
         }
     }
@@ -575,9 +412,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
     private String srcAddressName() {
         String name = null;
-        Channel c = channelRef.get();
-        if (c != null) {
-            SocketAddress address = c.getLocalAddress();
+        Channel channel = channelRef.get();
+        if (channel != null) {
+            SocketAddress address = channel.getLocalAddress();
             if (address != null) {
                 name = address.toString();
             }
@@ -585,47 +422,66 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return name;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
         return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
     }
 
     /**
-     * Asynchronously establishes a Netty connection to the remote address, returning a Netty
Channel on success.
+     * Asynchronously establishes a Netty connection to the remote address
      */
-    private class Connector implements Callable<Channel> {
+    private class Connector implements Runnable {
 
         private final InetSocketAddress address;
-        private final int connectionAttempt;
 
-        public Connector(InetSocketAddress address, int connectionAttempt) {
+        public Connector(InetSocketAddress address) {
             this.address = address;
-            if (connectionAttempt < 1) {
-                throw new IllegalArgumentException("connection attempt must be >= 1 (you
provided " +
-                    connectionAttempt + ")");
-            }
-            this.connectionAttempt = connectionAttempt;
         }
 
-        @Override public Channel call() throws Exception {
-            LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
-            Channel channel = null;
-            ChannelFuture future = bootstrap.connect(address);
-            future.awaitUninterruptibly();
-            Channel current = future.getChannel();
-
-            if (future.isSuccess() && connectionEstablished(current)) {
-                channel = current;
-                LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
channel.toString(),
-                    connectionAttempt);
+        private void reconnectAgain(Throwable t) {
+            String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
+                    dstAddressPrefixedName);
+            String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
+            LOG.error(failureMsg);
+            long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+            scheduleConnect(nextDelayMs);
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                if (reconnectingAllowed()) {
+                    int connectionAttempt = connectionAttempts.getAndIncrement();
+                    totalConnectionAttempts.getAndIncrement();
+
+                    LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+                    ChannelFuture future = bootstrap.connect(address);
+                    future.awaitUninterruptibly();
+                    Channel newChannel = future.getChannel();
+
+                    if (future.isSuccess() && connectionEstablished(newChannel))
{
+                        boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                        checkState(setChannel);
+                        LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
newChannel.toString(),
+                                connectionAttempt);
+                    } else {
+                        Throwable cause = future.getCause();
+                        reconnectAgain(cause);
+                        if (newChannel != null) {
+                            newChannel.close();
+                        }
                     }
-            else {
-                LOG.debug("failed to connect to {} [attempt {}]", address.toString(), connectionAttempt);
-                if (current != null) {
-                    current.close();
+                } else {
+                    close();
+                    throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
+                            connectionAttempts + " failed attempts");
                 }
+            } catch (Throwable e) {
+                LOG.error("Uncaught throwable", e);
+                throw Throwables.propagate(e);
             }
-            return channel;
+        }
     }
-}
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ed8ab3ec/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 63c861a..169940f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -133,4 +133,8 @@ class MessageBatch {
         if (payload_len >0)
             bout.write(message.message());
     }
+
+    public ArrayList<TaskMessage> getMsgs() {
+        return msgs;
+    }
 }
\ No newline at end of file


Mime
View raw message