storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [14/23] storm git commit: Bring back the batching of in-between calls to send.
Date Tue, 14 Jul 2015 19:06:45 GMT
Bring back the batching of in-between calls to send.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/832b5db2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/832b5db2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/832b5db2

Branch: refs/heads/0.9.x-branch
Commit: 832b5db2ec5101601894d8484f0d1707d98331e6
Parents: 084c5a0
Author: Enno Shioji <eshioji@gmail.com>
Authored: Tue Jun 2 23:25:13 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Tue Jun 2 23:25:13 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 294 +++++++++++++------
 .../backtype/storm/messaging/netty/Context.java |  16 +-
 .../storm/messaging/netty/MessageBatch.java     |   4 +
 3 files changed, 214 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/832b5db2/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 2c7f3db..0d75448 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -23,14 +23,14 @@ import backtype.storm.messaging.TaskMessage;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +41,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -121,12 +120,16 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      */
     private final int messageBatchSize;
 
-    private final ListeningScheduledExecutorService scheduler;
+    private final HashedWheelTimer scheduler;
+
+    private final Object pendingMessageLock = new Object();
+    private MessageBatch pendingMessage;
+    private Timeout pendingFlush;
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port) {
+    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host,
int port) {
         closing = false;
-        this.scheduler = MoreExecutors.listeningDecorator(scheduler);
+        this.scheduler = scheduler;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port,
bufferSize);
         messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
262144);
@@ -141,6 +144,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         scheduleConnect(NO_DELAY_MS);
+
+        // Dummy values to avoid null checks
+        pendingMessage = new MessageBatch(messageBatchSize);
+        pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -163,7 +170,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * We will retry connection with exponential back-off policy
      */
     private void scheduleConnect(long delayMs) {
-        scheduler.schedule(new Connector(dstAddress), delayMs, TimeUnit.MILLISECONDS);
+        scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
     }
 
     private boolean reconnectingAllowed() {
@@ -230,49 +237,122 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             return;
         }
 
-        Channel channel = channelRef.get();
-        if (!connectionEstablished(channel)) {
-            // Closing the channel and reconnecting should be done before handling the messages.
-            boolean reconnectScheduled = closeChannelAndReconnect(channel);
-            if(reconnectScheduled){
-                // Log the connection error only once
-                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
-            }
-            handleMessagesWhenConnectionIsUnavailable(msgs);
+        Channel channel = getConnectedChannel();
+        if (channel == null) {
+            /*
+             * Connection is unavailable. We will drop pending messages and let at-least-once
message replay kick in.
+             *
+             * Another option would be to buffer the messages in memory.  But this option
has the risk of causing OOM errors,
+             * especially for topologies that disable message acking because we don't know
whether the connection recovery will
+             * succeed  or not, and how long the recovery will take.
+             */
+            dropMessages(msgs);
             return;
         }
 
-        MessageBatch toSend = new MessageBatch(messageBatchSize);
+        MessageBatch replacement = new MessageBatch(messageBatchSize);
+        MessageBatch previous;
+        synchronized (pendingMessageLock) {
+            // pendingMessage is never null
+            previous = pendingMessage;
+            pendingMessage = replacement;
+
+            // We are flushing the pending messages, therefore we can cancel the current
pending flush
+            // The cancel is idempotent
+            pendingFlush.cancel();
+        }
+
+        // Collect messages into batches (to optimize network throughput)
+        Batches batches = createBatches(previous, msgs);
+
+        // Then flush the batches that are full
+        flushMessages(channel, batches.fullBatches);
+
+        if (batches.unfilled.isEmpty()) {
+            // All messages ended up neatly into batches; there are no unfilled MessageBatch
+            return;
+        }
+
+        if (channel.isWritable()) {
+            // Netty's internal buffer is not full. We should write the unfilled MessageBatch
immediately
+            // to reduce latency
+            flushMessages(channel, batches.unfilled);
+        } else {
+            // We have an unfilled MessageBatch, but Netty's internal buffer is full, meaning
that we have time.
+            // In this situation, waiting for more messages before handing it to Netty yields
better throughput
+            queueUp(channel, batches.unfilled);
+        }
+    }
+
+    private void queueUp(Channel channel, MessageBatch unfilled) {
+        Batches batches;
+        synchronized (pendingMessageLock) {
+            batches = createBatches(pendingMessage, unfilled.getMsgs().iterator());
+            pendingMessage = batches.unfilled;
+
+            if(!pendingMessage.isEmpty()) {
+                // We have a MessageBatch that isn't full yet, so we will wait for more messages.
+                // However, we don't want to wait indefinitely so we schedule a timeout which
flushes
+                // this batch if it's still not flushed after a delay
 
-        // Collect messages into batches (to optimize network throughput), then flush them.
+                // First, cancel the currently pending flush, because we just saw that Netty's
+                // buffer is full and thus we know we can wait longer
+                pendingFlush.cancel();
+
+                // Schedule the new flush
+                pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        // MessageBatches that were filled are immediately handed to Netty
+        flushMessages(channel, batches.fullBatches);
+
+    }
+
+
+    private static class Batches {
+        final List<MessageBatch> fullBatches;
+        final MessageBatch unfilled;
+
+        private Batches(List<MessageBatch> fullBatches, MessageBatch unfilled) {
+            this.fullBatches = fullBatches;
+            this.unfilled = unfilled;
+        }
+    }
+
+    private Batches createBatches(MessageBatch previous, Iterator<TaskMessage> msgs){
+        List<MessageBatch> ret = new ArrayList<MessageBatch>();
         while (msgs.hasNext()) {
             TaskMessage message = msgs.next();
-            toSend.add(message);
-            if (toSend.isFull()) {
-                flushMessages(channel, toSend);
-                toSend = new MessageBatch(messageBatchSize);
+            previous.add(message);
+            if (previous.isFull()) {
+                ret.add(previous);
+                previous = new MessageBatch(messageBatchSize);
             }
         }
 
-        // Handle any remaining messages in case the "last" batch was not full.
-        flushMessages(channel, toSend);
+        return new Batches(ret, previous);
+    }
 
+    private Channel getConnectedChannel() {
+        Channel channel = channelRef.get();
+        if (connectionEstablished(channel)) {
+            return channel;
+        } else {
+            // Closing the channel and reconnecting should be done before handling the messages.
+            boolean reconnectScheduled = closeChannelAndReconnect(channel);
+            if (reconnectScheduled) {
+                // Log the connection error only once
+                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+            }
+            return null;
+        }
     }
 
     private boolean hasMessages(Iterator<TaskMessage> msgs) {
         return msgs != null && msgs.hasNext();
     }
 
-    /**
-     * We will drop pending messages and let at-least-once message replay kick in.
-     *
-     * Another option would be to buffer the messages in memory.  But this option has the
risk of causing OOM errors,
-     * especially for topologies that disable message acking because we don't know whether
the connection recovery will
-     * succeed  or not, and how long the recovery will take.
-     */
-    private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs)
{
-        dropMessages(msgs);
-    }
 
     private void dropMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
@@ -280,6 +360,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         messagesLost.getAndAdd(msgCount);
     }
 
+    private void dropMessages(MessageBatch msgs) {
+        messagesLost.getAndAdd(msgs.size());
+    }
+
     private int iteratorSize(Iterator<TaskMessage> msgs) {
         int size = 0;
         if (msgs != null) {
@@ -291,20 +375,23 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return size;
     }
 
+    private void flushMessages(Channel channel, List<MessageBatch> batches) {
+        for (MessageBatch batch : batches) {
+            flushMessages(channel, batch);
+        }
+    }
+
+
     /**
      * Asynchronously writes the message batch to the channel.
      *
      * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
     private void flushMessages(Channel channel, final MessageBatch batch) {
-        if (!containsMessages(batch)) {
-            return;
-        }
-
-
         final int numMessages = batch.size();
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
         pendingMessages.addAndGet(numMessages);
+
         ChannelFuture future = channel.write(batch);
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future) throws Exception {
@@ -325,7 +412,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
     /**
      * Schedule a reconnect if we closed a non-null channel, and acquired the right to
-     * provide a replacement
+     * provide a replacement by successfully setting a null to the channel field
      * @param channel
      * @return if the call scheduled a re-connect task
      */
@@ -340,15 +427,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return false;
     }
 
-    private boolean containsMessages(MessageBatch batch) {
-        return batch != null && !batch.isEmpty();
-    }
-
     /**
      * Gracefully close this client.
-     *
-     * We will attempt to send any pending messages (i.e. messages currently buffered in
memory) before closing the
-     * client.
      */
     @Override
     public void close() {
@@ -356,10 +436,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
+
             closeChannel();
         }
     }
 
+
     private void closeChannel() {
         Channel channel = channelRef.get();
         if (channel != null) {
@@ -402,15 +484,56 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     }
 
     /**
+     * Asynchronously flushes pending messages to the remote address, if they have not been
+     * flushed by other means.
+     * This task runs on a single thread shared among all clients, and thus
+     * should not perform operations that block or are expensive.
+     */
+    private class Flush implements TimerTask {
+        private final MessageBatch instructor;
+
+        private Flush(MessageBatch instructor) {
+            this.instructor = instructor;
+        }
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            MessageBatch toSend;
+            MessageBatch replacement = new MessageBatch(messageBatchSize);
+            synchronized (pendingMessageLock){
+                if(instructor == pendingMessage){
+                    // It's still the batch which scheduled this timeout
+                    toSend = pendingMessage;
+                    pendingMessage = replacement;
+                    checkState(!toSend.isFull(), "Only unfilled batches should get timeouts
scheduled");
+                } else {
+                    // It's no longer the batch which scheduled this timeout
+                    // No need to work on this one
+                    toSend = null;
+                }
+            }
+
+            if(toSend!=null){
+                Channel channel = getConnectedChannel();
+                if(channel == null) {
+                    dropMessages(toSend);
+                } else {
+                    flushMessages(channel, toSend);
+                }
+            }
+        }
+    }
+
+    /**
      * Asynchronously establishes a Netty connection to the remote address
-     * This task runs on a single (by default) thread shared among all clients, and thus
+     * This task runs on a single thread shared among all clients, and thus
      * should not perform operations that block.
      */
-    private class Connector implements Runnable {
+    private class Connect implements TimerTask {
 
         private final InetSocketAddress address;
 
-        public Connector(InetSocketAddress address) {
+        public Connect(InetSocketAddress address) {
             this.address = address;
         }
 
@@ -425,46 +548,41 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
 
         @Override
-        public void run() {
-            try {
-                if (reconnectingAllowed()) {
-                    final int connectionAttempt = connectionAttempts.getAndIncrement();
-                    totalConnectionAttempts.getAndIncrement();
-
-                    LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
-                    ChannelFuture future = bootstrap.connect(address);
-                    future.addListener(new ChannelFutureListener() {
-                        @Override
-                        public void operationComplete(ChannelFuture future) throws Exception
{
-                            // This call returns immediately
-                            Channel newChannel = future.getChannel();
-
-                            if (future.isSuccess() && connectionEstablished(newChannel))
{
-                                boolean setChannel = channelRef.compareAndSet(null, newChannel);
-                                checkState(setChannel);
-                                LOG.debug("successfully connected to {}, {} [attempt {}]",
address.toString(), newChannel.toString(),
-                                        connectionAttempt);
-                                if(messagesLost.get() > 0){
-                                    LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
-                                }
-                            } else {
-                                Throwable cause = future.getCause();
-                                reschedule(cause);
-                                if (newChannel != null) {
-                                    newChannel.close();
-                                }
+        public void run(Timeout timeout) throws Exception {
+            if (reconnectingAllowed()) {
+                final int connectionAttempt = connectionAttempts.getAndIncrement();
+                totalConnectionAttempts.getAndIncrement();
+
+                LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+                ChannelFuture future = bootstrap.connect(address);
+                future.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception
{
+                        // This call returns immediately
+                        Channel newChannel = future.getChannel();
+
+                        if (future.isSuccess() && connectionEstablished(newChannel))
{
+                            boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                            checkState(setChannel);
+                            LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
newChannel.toString(),
+                                    connectionAttempt);
+                            if (messagesLost.get() > 0) {
+                                LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
+                            }
+                        } else {
+                            Throwable cause = future.getCause();
+                            reschedule(cause);
+                            if (newChannel != null) {
+                                newChannel.close();
                             }
                         }
-                    });
-                } else {
-                    close();
-                    throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
-                            connectionAttempts + " failed attempts. " + messagesLost.get()
+ " messages were lost");
+                    }
+                });
+            } else {
+                close();
+                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
+                        connectionAttempts + " failed attempts. " + messagesLost.get() +
" messages were lost");
 
-                }
-            } catch (Throwable e) {
-                LOG.error("Uncaught throwable", e);
-                throw Throwables.propagate(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/832b5db2/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index f592aff..64f67ba 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,6 +18,7 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
@@ -40,8 +41,7 @@ public class Context implements IContext {
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
-    private ScheduledExecutorService clientScheduleService;
-    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
+    private HashedWheelTimer clientScheduleService;
 
     /**
      * initialization per Storm configuration 
@@ -63,9 +63,7 @@ public class Context implements IContext {
                     Executors.newCachedThreadPool(workerFactory));
         }
         
-        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
-        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
-        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
+        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"),
10, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -91,18 +89,12 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
-        clientScheduleService.shutdown();        
+        clientScheduleService.stop();
         
         for (IConnection conn : connections) {
             conn.close();
         }
         
-        try {
-            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            LOG.error("Error when shutting down client scheduler", e);
-        }
-        
         connections = null;
 
         //we need to release resources associated with client channel factory

http://git-wip-us.apache.org/repos/asf/storm/blob/832b5db2/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 63c861a..169940f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -133,4 +133,8 @@ class MessageBatch {
         if (payload_len >0)
             bout.write(message.message());
     }
+
+    public ArrayList<TaskMessage> getMsgs() {
+        return msgs;
+    }
 }
\ No newline at end of file


Mime
View raw message