storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/5] storm git commit: This fixes STORM-763 and STORM-839
Date Tue, 14 Jul 2015 19:06:00 GMT
Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch 701d103f0 -> 56de46c3e


This fixes STORM-763 and STORM-839


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/286bacdf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/286bacdf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/286bacdf

Branch: refs/heads/0.10.x-branch
Commit: 286bacdf49937d1e8576eff27dfc887824ffdbbb
Parents: 90c5994
Author: Enno Shioji <eshioji@gmail.com>
Authored: Thu Jul 2 19:01:01 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Thu Jul 2 19:01:01 2015 +0100

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 -
 .../backtype/storm/messaging/netty/Client.java  | 469 +++++++------------
 .../backtype/storm/messaging/netty/Context.java |  23 +-
 .../storm/messaging/netty/MessageBatch.java     |  24 +-
 .../storm/messaging/netty/MessageBuffer.java    |  58 +++
 .../messaging/netty/SaslStormClientHandler.java |   4 +-
 .../messaging/netty/StormClientHandler.java     |  46 ++
 .../netty/StormClientPipelineFactory.java       |   4 +-
 8 files changed, 297 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4f7376b..a87d28d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -157,8 +157,6 @@ storm.messaging.netty.min_wait_ms: 100
 storm.messaging.netty.transfer.batch.size: 262144
 # Sets the backlog value to specify when the channel binds to a local address
 storm.messaging.netty.socket.backlog: 500
-# We check with this interval that whether the Netty channel is writable and try to write
pending messages if it is.
-storm.messaging.netty.flush.check.interval.ms: 10
 
 # By default, the Netty SASL authentication is set to false.  Users can override and set
it true for a specific topology.
 storm.messaging.netty.authentication: false

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index f332bb3..9be2b06 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -17,36 +17,36 @@
  */
 package backtype.storm.messaging.netty;
 
+import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.metric.api.IStatefulObject;
+import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.*;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.ConnectionWithStatus;
-import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
-import backtype.storm.utils.Utils;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A Netty client for sending task messages to a remote destination (Netty server).
@@ -59,20 +59,16 @@ import backtype.storm.utils.Utils;
  * - Connecting and reconnecting are performed asynchronously.
  *     - Note: The current implementation drops any messages that are being enqueued for
sending if the connection to
  *       the remote destination is currently unavailable.
- * - A background flusher thread is run in the background.  It will, at fixed intervals,
check for any pending messages
- *   (i.e. messages buffered in memory) and flush them to the remote destination iff background
flushing is currently
- *   enabled.
  */
 public class Client extends ConnectionWithStatus implements IStatefulObject {
+    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
 
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
-    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
-    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
-    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
-    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
 
+    private final Map stormConf;
     private final StormBoundedExponentialBackoffRetry retryPolicy;
     private final ClientBootstrap bootstrap;
     private final InetSocketAddress dstAddress;
@@ -81,7 +77,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     /**
      * The channel used for all write operations from this client to the remote destination.
      */
-    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>(null);
+    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
 
 
     /**
@@ -114,51 +110,24 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      */
     private final AtomicLong pendingMessages = new AtomicLong(0);
 
+
     /**
      * This flag is set to true if and only if a client instance is being closed.
      */
     private volatile boolean closing = false;
 
-    /**
-     * When set to true, then the background flusher thread will flush any pending messages
on its next run.
-     */
-    private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false);
-
-    /**
-     * The absolute time (in ms) when the next background flush should be performed.
-     *
-     * Note: The flush operation will only be performed if backgroundFlushingEnabled is true,
too.
-     */
-    private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong(DISTANT_FUTURE_TIME_MS);
-
-    /**
-     * The time interval (in ms) at which the background flusher thread will be run to check
for any pending messages
-     * to be flushed.
-     */
-    private final int flushCheckIntervalMs;
+    private final HashedWheelTimer scheduler;
 
-    /**
-     * How many messages should be batched together before sending them to the remote destination.
-     *
-     * Messages are batched to optimize network throughput at the expense of latency.
-     */
-    private final int messageBatchSize;
-
-    private MessageBatch messageBatch = null;
-    private final ListeningScheduledExecutorService scheduler;
-    protected final Map stormConf;
-    private Context context;
+    private final MessageBuffer batcher;
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port, Context context) {
-        this.context = context;
-        closing = false;
+    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host,
int port) {
         this.stormConf = stormConf;
-        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
+        closing = false;
+        this.scheduler = scheduler;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port,
bufferSize);
-        messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
262144);
-        flushCheckIntervalMs = Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS),
10);
+        int messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
262144);
 
         maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
@@ -169,13 +138,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         bootstrap = createClientBootstrap(factory, bufferSize);
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
-        connect(NO_DELAY_MS);
-
-        // Launch background flushing thread
-        pauseBackgroundFlushing();
-        long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts);
-        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs,
-            TimeUnit.MILLISECONDS);
+        scheduleConnect(NO_DELAY_MS);
+        batcher = new MessageBuffer(messageBatchSize);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -194,115 +158,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return "";
     }
 
-    private Runnable createBackgroundFlusher() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                if(!closing && backgroundFlushingEnabled.get() && nowMillis()
> nextBackgroundFlushTimeMs.get()) {
-                    LOG.debug("flushing {} pending messages to {} in background", messageBatch.size(),
-                        dstAddressPrefixedName);
-                    flushPendingMessages();
-                }
-            }
-        };
-    }
-
-    private void pauseBackgroundFlushing() {
-        backgroundFlushingEnabled.set(false);
-    }
-
-    private void resumeBackgroundFlushing() {
-        backgroundFlushingEnabled.set(true);
-    }
-
-    private synchronized void flushPendingMessages() {
-        Channel channel = channelRef.get();
-        if (containsMessages(messageBatch)) {
-            if (connectionEstablished(channel)) {
-                if (channel.isWritable()) {
-                    pauseBackgroundFlushing();
-                    MessageBatch toBeFlushed = messageBatch;
-                    flushMessages(channel, toBeFlushed);
-                    messageBatch = null;
-                }
-                else if (closing) {
-                    // Ensure background flushing is enabled so that we definitely have a
chance to re-try the flush
-                    // operation in case the client is being gracefully closed (where we
have a brief time window where
-                    // the client will wait for pending messages to be sent).
-                    resumeBackgroundFlushing();
-                }
-            }
-            else {
-                closeChannelAndReconnect(channel);
-            }
-        }
-    }
-
-    private long nowMillis() {
-        return System.currentTimeMillis();
-    }
-
     /**
      * We will retry connection with exponential back-off policy
      */
-    private synchronized void connect(long delayMs) {
-        try {
-            if (closing) {
-                LOG.info("not connecting to {} because this client is being closed", dstAddressPrefixedName);
-                return;
-            }
-
-            if (connectionEstablished(channelRef.get())) {
-                LOG.info("not connecting to {} because the connection is already established",
dstAddressPrefixedName);
-                return;
-            }
-
-            connectionAttempts.getAndIncrement();
-            if (reconnectingAllowed()) {
-                totalConnectionAttempts.getAndIncrement();
-                LOG.info("connection attempt {} to {} scheduled to run in {} ms", connectionAttempts.get(),
-                    dstAddressPrefixedName, delayMs);
-                ListenableFuture<Channel> channelFuture = scheduler.schedule(
-                    new Connector(dstAddress, connectionAttempts.get()), delayMs, TimeUnit.MILLISECONDS);
-                Futures.addCallback(channelFuture, new FutureCallback<Channel>() {
-                    @Override public void onSuccess(Channel result) {
-                        if (connectionEstablished(result)) {
-                            setChannel(result);
-                            LOG.info("connection established to {}", dstAddressPrefixedName);
-                            connectionAttempts.set(0);
-                        }
-                        else {
-                            reconnectAgain(new RuntimeException("Returned channel was actually
not established"));
-                        }
-                    }
-
-                    @Override public void onFailure(Throwable t) {
-                        reconnectAgain(t);
-                    }
-
-                    private void reconnectAgain(Throwable t) {
-                        String baseMsg = String.format("connection attempt %s to %s failed",
connectionAttempts,
-                            dstAddressPrefixedName);
-                        String failureMsg = (t == null)? baseMsg : baseMsg + ": " + t.toString();
-                        LOG.error(failureMsg);
-                        long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(),
0);
-                        connect(nextDelayMs);
-                    }
-                });
-            }
-            else {
-                close();
-                throw new RuntimeException("Giving up to connect to " + dstAddressPrefixedName
+ " after " +
-                    connectionAttempts + " failed attempts");
-            }
-        }
-        catch (Exception e) {
-            throw new RuntimeException("Failed to connect to " + dstAddressPrefixedName,
e);
-        }
-    }
-
-    private void setChannel(Channel channel) {
-        channelRef.set(channel);
+    private void scheduleConnect(long delayMs) {
+        scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
     }
 
     private boolean reconnectingAllowed() {
@@ -328,11 +188,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     public Status status() {
         if (closing) {
             return Status.Closed;
-        }
-        else if (!connectionEstablished(channelRef.get())) {
+        } else if (!connectionEstablished(channelRef.get())) {
             return Status.Connecting;
-        }
-        else {
+        } else {
             return Status.Ready;
         }
     }
@@ -359,11 +217,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
      */
     @Override
-    public synchronized void send(Iterator<TaskMessage> msgs) {
+    public void send(Iterator<TaskMessage> msgs) {
         if (closing) {
             int numMessages = iteratorSize(msgs);
             LOG.error("discarding {} messages because the Netty client to {} is being closed",
numMessages,
-                dstAddressPrefixedName);
+                    dstAddressPrefixedName);
             return;
         }
 
@@ -371,78 +229,67 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             return;
         }
 
-        Channel channel = channelRef.get();
-        if (!connectionEstablished(channel)) {
-            // Closing the channel and reconnecting should be done before handling the messages.
-            closeChannelAndReconnect(channel);
-            handleMessagesWhenConnectionIsUnavailable(msgs);
+        Channel channel = getConnectedChannel();
+        if (channel == null) {
+            /*
+             * Connection is unavailable. We will drop pending messages and let at-least-once
message replay kick in.
+             *
+             * Another option would be to buffer the messages in memory.  But this option
has the risk of causing OOM errors,
+             * especially for topologies that disable message acking because we don't know
whether the connection recovery will
+             * succeed  or not, and how long the recovery will take.
+             */
+            dropMessages(msgs);
             return;
         }
 
-        // Collect messages into batches (to optimize network throughput), then flush them.
+
         while (msgs.hasNext()) {
             TaskMessage message = msgs.next();
-            if (messageBatch == null) {
-                messageBatch = new MessageBatch(messageBatchSize);
+            MessageBatch full = batcher.add(message);
+            if(full != null){
+                flushMessages(channel, full);
             }
+        }
 
-            messageBatch.add(message);
-            if (messageBatch.isFull()) {
-                MessageBatch toBeFlushed = messageBatch;
-                flushMessages(channel, toBeFlushed);
-                messageBatch = null;
+        if(channel.isWritable()){
+            // Netty's internal buffer is not full and we still have message left in the
buffer.
+            // We should write the unfilled MessageBatch immediately to reduce latency
+            MessageBatch batch = batcher.drain();
+            if(batch != null) {
+                flushMessages(channel, batch);
             }
+        } else {
+            // Channel's buffer is full, meaning that we have time to wait other messages
to arrive, and create a bigger
+            // batch. This yields better throughput.
+            // We can rely on `notifyInterestChanged` to push these messages as soon as there
is spece in Netty's buffer
+            // because we know `Channel.isWritable` was false after the messages were already
in the buffer.
         }
+    }
 
-        // Handle any remaining messages in case the "last" batch was not full.
-        if (containsMessages(messageBatch)) {
-            if (connectionEstablished(channel) && channel.isWritable()) {
-                // We can write to the channel, so we flush the remaining messages immediately
to minimize latency.
-                pauseBackgroundFlushing();
-                MessageBatch toBeFlushed = messageBatch;
-                messageBatch = null;
-                flushMessages(channel, toBeFlushed);
-            }
-            else {
-                // We cannot write to the channel, which means Netty's internal write buffer
is full.
-                // In this case, we buffer the remaining messages and wait for the next messages
to arrive.
-                //
-                // Background:
-                // Netty 3.x maintains an internal write buffer with a high water mark for
each channel (default: 64K).
-                // This represents the amount of data waiting to be flushed to operating
system buffers.  If the
-                // outstanding data exceeds this value then the channel is set to non-writable.
 When this happens, a
-                // INTEREST_CHANGED channel event is triggered.  Netty sets the channel to
writable again once the data
-                // has been flushed to the system buffers.
-                //
-                // See http://stackoverflow.com/questions/14049260
-                resumeBackgroundFlushing();
-                nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs);
+    private Channel getConnectedChannel() {
+        Channel channel = channelRef.get();
+        if (connectionEstablished(channel)) {
+            return channel;
+        } else {
+            // Closing the channel and reconnecting should be done before handling the messages.
+            boolean reconnectScheduled = closeChannelAndReconnect(channel);
+            if (reconnectScheduled) {
+                // Log the connection error only once
+                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
             }
+            return null;
         }
-
     }
 
     private boolean hasMessages(Iterator<TaskMessage> msgs) {
         return msgs != null && msgs.hasNext();
     }
 
-    /**
-     * We will drop pending messages and let at-least-once message replay kick in.
-     *
-     * Another option would be to buffer the messages in memory.  But this option has the
risk of causing OOM errors,
-     * especially for topologies that disable message acking because we don't know whether
the connection recovery will
-     * succeed  or not, and how long the recovery will take.
-     */
-    private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs)
{
-        LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
-        dropMessages(msgs);
-    }
 
     private void dropMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-        LOG.error("dropping {} message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {
@@ -461,26 +308,25 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      *
      * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
-    private synchronized void flushMessages(Channel channel, final MessageBatch batch) {
-        if (!containsMessages(batch)) {
+    private void flushMessages(Channel channel, final MessageBatch batch) {
+        if(batch.isEmpty()){
             return;
         }
-
+        
         final int numMessages = batch.size();
-        pendingMessages.getAndAdd(numMessages);
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
+        pendingMessages.addAndGet(numMessages);
+
         ChannelFuture future = channel.write(batch);
         future.addListener(new ChannelFutureListener() {
-
             public void operationComplete(ChannelFuture future) throws Exception {
-                pendingMessages.getAndAdd(0 - numMessages);
+                pendingMessages.addAndGet(0 - numMessages);
                 if (future.isSuccess()) {
                     LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
                     messagesSent.getAndAdd(batch.size());
-                }
-                else {
+                } else {
                     LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
-                        future.getCause());
+                            future.getCause());
                     closeChannelAndReconnect(future.getChannel());
                     messagesLost.getAndAdd(numMessages);
                 }
@@ -489,52 +335,48 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         });
     }
 
-    private synchronized void closeChannelAndReconnect(Channel channel) {
+    /**
+     * Schedule a reconnect if we closed a non-null channel, and acquired the right to
+     * provide a replacement by successfully setting a null to the channel field
+     * @param channel
+     * @return if the call scheduled a re-connect task
+     */
+    private boolean closeChannelAndReconnect(Channel channel) {
         if (channel != null) {
-            LOG.info("closing channel {} to {}", channel.toString(), dstAddressPrefixedName);
             channel.close();
             if (channelRef.compareAndSet(channel, null)) {
-                LOG.info("triggering reconnect to {}", dstAddressPrefixedName);
-                connect(NO_DELAY_MS);
+                scheduleConnect(NO_DELAY_MS);
+                return true;
             }
         }
-    }
-
-    private boolean containsMessages(MessageBatch batch) {
-        return batch != null && !batch.isEmpty();
+        return false;
     }
 
     /**
      * Gracefully close this client.
-     *
-     * We will attempt to send any pending messages (i.e. messages currently buffered in
memory) before closing the
-     * client.
      */
     @Override
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
-            context.removeClient(dstAddress.getHostName(),dstAddress.getPort());
-            context = null;
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
-            flushPendingMessages();
             waitForPendingMessagesToBeSent();
             closeChannel();
         }
     }
 
-    private synchronized void waitForPendingMessagesToBeSent() {
+    private void waitForPendingMessagesToBeSent() {
         LOG.info("waiting up to {} ms to send {} pending messages to {}",
-            PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+                PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
         long totalPendingMsgs = pendingMessages.get();
-        long startMs = nowMillis();
+        long startMs = System.currentTimeMillis();
         while (pendingMessages.get() != 0) {
             try {
-                long deltaMs = nowMillis() - startMs;
+                long deltaMs = System.currentTimeMillis() - startMs;
                 if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                     LOG.error("failed to send all pending messages to {} within timeout,
{} of {} messages were not " +
-                        "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                            "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
                     break;
                 }
                 Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
@@ -546,9 +388,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
     }
 
-    private synchronized void closeChannel() {
-        if (channelRef.get() != null) {
-            channelRef.get().close();
+
+    private void closeChannel() {
+        Channel channel = channelRef.get();
+        if (channel != null) {
+            channel.close();
             LOG.debug("channel to {} closed", dstAddressPrefixedName);
         }
     }
@@ -569,11 +413,15 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return ret;
     }
 
+    public Map getStormConf() {
+        return stormConf;
+    }
+
     private String srcAddressName() {
         String name = null;
-        Channel c = channelRef.get();
-        if (c != null) {
-            SocketAddress address = c.getLocalAddress();
+        Channel channel = channelRef.get();
+        if (channel != null) {
+            SocketAddress address = channel.getLocalAddress();
             if (address != null) {
                 name = address.toString();
             }
@@ -581,46 +429,83 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return name;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
         return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
     }
 
     /**
-     * Asynchronously establishes a Netty connection to the remote address, returning a Netty
Channel on success.
+     * Called by Netty thread on change in channel interest
+     * @param channel
+     */
+    public void notifyInterestChanged(Channel channel) {
+        if(channel.isWritable()){
+            // Channel is writable again, write if there are any messages pending
+            MessageBatch pending = batcher.drain();
+            flushMessages(channel, pending);
+        }
+    }
+
+    /**
+     * Asynchronously establishes a Netty connection to the remote address
+     * This task runs on a single thread shared among all clients, and thus
+     * should not perform operations that block.
      */
-    private class Connector implements Callable<Channel> {
+    private class Connect implements TimerTask {
 
         private final InetSocketAddress address;
-        private final int connectionAttempt;
 
-        public Connector(InetSocketAddress address, int connectionAttempt) {
+        public Connect(InetSocketAddress address) {
             this.address = address;
-            if (connectionAttempt < 1) {
-                throw new IllegalArgumentException("connection attempt must be >= 1 (you
provided " +
-                    connectionAttempt + ")");
-            }
-            this.connectionAttempt = connectionAttempt;
         }
 
-        @Override public Channel call() throws Exception {
-            LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
-            Channel channel = null;
-            ChannelFuture future = bootstrap.connect(address);
-            future.awaitUninterruptibly();
-            Channel current = future.getChannel();
-
-            if (future.isSuccess() && connectionEstablished(current)) {
-                channel = current;
-                LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
channel.toString(),
-                    connectionAttempt);
-            }
-            else {
-                LOG.debug("failed to connect to {} [attempt {}]", address.toString(), connectionAttempt);
-                if (current != null) {
-                    current.close();
-                }
+        private void reschedule(Throwable t) {
+            String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
+                    dstAddressPrefixedName);
+            String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
+            LOG.error(failureMsg);
+            long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+            scheduleConnect(nextDelayMs);
+        }
+
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            if (reconnectingAllowed()) {
+                final int connectionAttempt = connectionAttempts.getAndIncrement();
+                totalConnectionAttempts.getAndIncrement();
+
+                LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+                ChannelFuture future = bootstrap.connect(address);
+                future.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception
{
+                        // This call returns immediately
+                        Channel newChannel = future.getChannel();
+
+                        if (future.isSuccess() && connectionEstablished(newChannel))
{
+                            boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                            checkState(setChannel);
+                            LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
newChannel.toString(),
+                                    connectionAttempt);
+                            if (messagesLost.get() > 0) {
+                                LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
+                            }
+                        } else {
+                            Throwable cause = future.getCause();
+                            reschedule(cause);
+                            if (newChannel != null) {
+                                newChannel.close();
+                            }
+                        }
+                    }
+                });
+            } else {
+                close();
+                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
+                        connectionAttempts + " failed attempts. " + messagesLost.get() +
" messages were lost");
+
             }
-            return channel;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 615702d..2baad0e 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,6 +18,7 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
@@ -40,8 +41,7 @@ public class Context implements IContext {
     private Map<String, IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
-    private ScheduledExecutorService clientScheduleService;
-    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
+    private HashedWheelTimer clientScheduleService;
 
     /**
      * initialization per Storm configuration 
@@ -55,6 +55,7 @@ public class Context implements IContext {
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
 		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
+        // TODO investigate impact of having one worker
         if (maxWorkers > 0) {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory), maxWorkers);
@@ -63,9 +64,7 @@ public class Context implements IContext {
                     Executors.newCachedThreadPool(workerFactory));
         }
         
-        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
-        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
-        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
+        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -87,7 +86,7 @@ public class Context implements IContext {
             return connection;
         }
         IConnection client =  new Client(storm_conf, clientChannelFactory, 
-                clientScheduleService, host, port, this);
+                clientScheduleService, host, port);
         connections.put(key(host, port), client);
         return client;
     }
@@ -100,18 +99,12 @@ public class Context implements IContext {
      * terminate this context
      */
     public synchronized void term() {
-        clientScheduleService.shutdown();        
-        
+        clientScheduleService.stop();
+
         for (IConnection conn : connections.values()) {
             conn.close();
         }
-        
-        try {
-            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            LOG.error("Error when shutting down client scheduler", e);
-        }
-        
+
         connections = null;
 
         //we need to release resources associated with client channel factory

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 63c861a..ec0dc0f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -35,32 +35,15 @@ class MessageBatch {
         encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
     }
 
-    void add(TaskMessage obj) {
-        if (obj == null)
-            throw new RuntimeException("null object forbidded in message batch");
+    void add(TaskMessage msg) {
+        if (msg == null)
+            throw new RuntimeException("null object forbidden in message batch");
 
-        TaskMessage msg = (TaskMessage)obj;
         msgs.add(msg);
         encoded_length += msgEncodeLength(msg);
     }
 
 
-    TaskMessage get(int index) {
-        return msgs.get(index);
-    }
-
-    /**
-     * try to add a TaskMessage to a batch
-     * @param taskMsg
-     * @return false if the msg could not be added due to buffer size limit; true otherwise
-     */
-    boolean tryAdd(TaskMessage taskMsg) {
-        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
-            return false;
-        add(taskMsg);
-        return true;
-    }
-
     private int msgEncodeLength(TaskMessage taskMsg) {
         if (taskMsg == null) return 0;
 
@@ -133,4 +116,5 @@ class MessageBatch {
         if (payload_len >0)
             bout.write(message.message());
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
new file mode 100644
index 0000000..d485e3a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+
+/**
+ * Encapsulates the state used for batching up messages.
+ */
+public class MessageBuffer {
+    private final int mesageBatchSize;
+    private MessageBatch currentBatch;
+
+    public MessageBuffer(int mesageBatchSize){
+        this.mesageBatchSize = mesageBatchSize;
+        this.currentBatch = new MessageBatch(mesageBatchSize);
+    }
+
+    public synchronized MessageBatch add(TaskMessage msg){
+        currentBatch.add(msg);
+        if(currentBatch.isFull()){
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+
+    public synchronized boolean isEmpty() {
+        return currentBatch.isEmpty();
+    }
+
+    public synchronized MessageBatch drain() {
+        if(!currentBatch.isEmpty()) {
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index 32ecb40..12b466c 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -146,8 +146,8 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler
{
     }
 
     private void getSASLCredentials() throws IOException {
-        topologyName = (String) this.client.stormConf.get(Config.TOPOLOGY_NAME);
-        String secretKey = SaslUtils.getSecretKey(this.client.stormConf);
+        topologyName = (String) this.client.getStormConf().get(Config.TOPOLOGY_NAME);
+        String secretKey = SaslUtils.getSecretKey(this.client.getStormConf());
         if (secretKey != null) {
             token = secretKey.getBytes();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
new file mode 100644
index 0000000..2d25001
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.net.ConnectException;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+    private Client client;
+    
+    StormClientHandler(Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws
Exception {
+        client.notifyInterestChanged(e.getChannel());
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+        Throwable cause = event.getCause();
+        if (!(cause instanceof ConnectException)) {
+            LOG.info("Connection failed " + client.dstAddressPrefixedName, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/286bacdf/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 2adfceb..4be06cd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -39,14 +39,14 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
 
-        boolean isNettyAuth = (Boolean) this.client.stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        boolean isNettyAuth = (Boolean) this.client.getStormConf().get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         if (isNettyAuth) {
             // Authenticate: Removed after authentication completes
             pipeline.addLast("saslClientHandler", new SaslStormClientHandler(
                     client));
         }
         // business logic.
-        pipeline.addLast("handler", new StormClientErrorHandler(client.dstAddressPrefixedName));
+        pipeline.addLast("handler", new StormClientHandler(client));
 
         return pipeline;
     }


Mime
View raw message