storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [17/23] storm git commit: Revert the background flushing
Date Tue, 14 Jul 2015 19:06:48 GMT
Revert the background flushing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4104a0a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4104a0a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4104a0a9

Branch: refs/heads/0.9.x-branch
Commit: 4104a0a9006b57debb63f56c0dd21a44809aa380
Parents: 167be7e
Author: Enno Shioji <eshioji@gmail.com>
Authored: Wed Jun 3 12:56:54 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Wed Jun 3 12:56:54 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 209 +++++--------------
 .../backtype/storm/messaging/netty/Context.java |  20 +-
 .../storm/messaging/netty/MessageBatch.java     |   4 -
 3 files changed, 60 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4104a0a9/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 5d9381e..2c7f3db 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -24,14 +24,13 @@ import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,16 +121,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      */
     private final int messageBatchSize;
 
-    private final ScheduledExecutorService scheduler;
-
-    private final Object pendingMessageLock = new Object();
-    private MessageBatch pendingMessage;
-    private Timeout pendingFlush;
+    private final ListeningScheduledExecutorService scheduler;
 
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port) {
         closing = false;
-        this.scheduler = scheduler;
+        this.scheduler = MoreExecutors.listeningDecorator(scheduler);
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port,
bufferSize);
         messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
262144);
@@ -146,10 +141,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         scheduleConnect(NO_DELAY_MS);
-
-        // Dummy values to avoid null checks
-        pendingMessage = new MessageBatch(messageBatchSize);
-        scheduler.scheduleWithFixedDelay(new Flush(), 10, 10, TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -172,7 +163,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * We will retry connection with exponential back-off policy
      */
     private void scheduleConnect(long delayMs) {
-        scheduler.schedule(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
+        scheduler.schedule(new Connector(dstAddress), delayMs, TimeUnit.MILLISECONDS);
     }
 
     private boolean reconnectingAllowed() {
@@ -239,110 +230,49 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             return;
         }
 
-        Channel channel = getConnectedChannel();
-        if (channel == null) {
-            /*
-             * Connection is unavailable. We will drop pending messages and let at-least-once
message replay kick in.
-             *
-             * Another option would be to buffer the messages in memory.  But this option
has the risk of causing OOM errors,
-             * especially for topologies that disable message acking because we don't know
whether the connection recovery will
-             * succeed  or not, and how long the recovery will take.
-             */
-            dropMessages(msgs);
-            return;
-        }
-
-        MessageBatch replacement = new MessageBatch(messageBatchSize);
-        MessageBatch previous;
-        synchronized (pendingMessageLock) {
-            // pendingMessage is never null
-            previous = pendingMessage;
-            pendingMessage = replacement;
-
-            // We are flushing the pending messages, therefore we can cancel the current
pending flush
-            // The cancel is idempotent
-            pendingFlush.cancel();
-        }
-
-        // Collect messages into batches (to optimize network throughput)
-        Batches batches = createBatches(previous, msgs);
-
-        // Then flush the batches that are full
-        flushMessages(channel, batches.fullBatches);
-
-        if (batches.unfilled.isEmpty()) {
-            // All messages ended up neatly into batches; there are no unfilled MessageBatch
+        Channel channel = channelRef.get();
+        if (!connectionEstablished(channel)) {
+            // Closing the channel and reconnecting should be done before handling the messages.
+            boolean reconnectScheduled = closeChannelAndReconnect(channel);
+            if(reconnectScheduled){
+                // Log the connection error only once
+                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+            }
+            handleMessagesWhenConnectionIsUnavailable(msgs);
             return;
         }
 
-        if (channel.isWritable()) {
-            // Netty's internal buffer is not full. We should write the unfilled MessageBatch
immediately
-            // to reduce latency
-            flushMessages(channel, batches.unfilled);
-        } else {
-            // We have an unfilled MessageBatch, but Netty's internal buffer is full, meaning
that we have time.
-            // In this situation, waiting for more messages before handing it to Netty yields
better throughput
-            queueUp(channel, batches.unfilled);
-        }
-    }
+        MessageBatch toSend = new MessageBatch(messageBatchSize);
 
-    private void queueUp(Channel channel, MessageBatch unfilled) {
-        Batches batches;
-        synchronized (pendingMessageLock) {
-            batches = createBatches(pendingMessage, unfilled.getMsgs().iterator());
-            // We have a MessageBatch that isn't full yet, so we will wait for more messages.
-            pendingMessage = batches.unfilled;
-        }
-
-        // MessageBatches that were filled are immediately handed to Netty
-        flushMessages(channel, batches.fullBatches);
-
-    }
-
-
-    private static class Batches {
-        final List<MessageBatch> fullBatches;
-        final MessageBatch unfilled;
-
-        private Batches(List<MessageBatch> fullBatches, MessageBatch unfilled) {
-            this.fullBatches = fullBatches;
-            this.unfilled = unfilled;
-        }
-    }
-
-    private Batches createBatches(MessageBatch previous, Iterator<TaskMessage> msgs){
-        List<MessageBatch> ret = new ArrayList<MessageBatch>();
+        // Collect messages into batches (to optimize network throughput), then flush them.
         while (msgs.hasNext()) {
             TaskMessage message = msgs.next();
-            previous.add(message);
-            if (previous.isFull()) {
-                ret.add(previous);
-                previous = new MessageBatch(messageBatchSize);
+            toSend.add(message);
+            if (toSend.isFull()) {
+                flushMessages(channel, toSend);
+                toSend = new MessageBatch(messageBatchSize);
             }
         }
 
-        return new Batches(ret, previous);
-    }
+        // Handle any remaining messages in case the "last" batch was not full.
+        flushMessages(channel, toSend);
 
-    private Channel getConnectedChannel() {
-        Channel channel = channelRef.get();
-        if (connectionEstablished(channel)) {
-            return channel;
-        } else {
-            // Closing the channel and reconnecting should be done before handling the messages.
-            boolean reconnectScheduled = closeChannelAndReconnect(channel);
-            if (reconnectScheduled) {
-                // Log the connection error only once
-                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
-            }
-            return null;
-        }
     }
 
     private boolean hasMessages(Iterator<TaskMessage> msgs) {
         return msgs != null && msgs.hasNext();
     }
 
+    /**
+     * We will drop pending messages and let at-least-once message replay kick in.
+     *
+     * Another option would be to buffer the messages in memory.  But this option has the
risk of causing OOM errors,
+     * especially for topologies that disable message acking because we don't know whether
the connection recovery will
+     * succeed  or not, and how long the recovery will take.
+     */
+    private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs)
{
+        dropMessages(msgs);
+    }
 
     private void dropMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
@@ -350,10 +280,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         messagesLost.getAndAdd(msgCount);
     }
 
-    private void dropMessages(MessageBatch msgs) {
-        messagesLost.getAndAdd(msgs.size());
-    }
-
     private int iteratorSize(Iterator<TaskMessage> msgs) {
         int size = 0;
         if (msgs != null) {
@@ -365,23 +291,20 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return size;
     }
 
-    private void flushMessages(Channel channel, List<MessageBatch> batches) {
-        for (MessageBatch batch : batches) {
-            flushMessages(channel, batch);
-        }
-    }
-
-
     /**
      * Asynchronously writes the message batch to the channel.
      *
      * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
     private void flushMessages(Channel channel, final MessageBatch batch) {
+        if (!containsMessages(batch)) {
+            return;
+        }
+
+
         final int numMessages = batch.size();
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
         pendingMessages.addAndGet(numMessages);
-
         ChannelFuture future = channel.write(batch);
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future) throws Exception {
@@ -402,7 +325,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
     /**
      * Schedule a reconnect if we closed a non-null channel, and acquired the right to
-     * provide a replacement by successfully setting a null to the channel field
+     * provide a replacement
      * @param channel
      * @return if the call scheduled a re-connect task
      */
@@ -417,8 +340,15 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return false;
     }
 
+    private boolean containsMessages(MessageBatch batch) {
+        return batch != null && !batch.isEmpty();
+    }
+
     /**
      * Gracefully close this client.
+     *
+     * We will attempt to send any pending messages (i.e. messages currently buffered in
memory) before closing the
+     * client.
      */
     @Override
     public void close() {
@@ -426,12 +356,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
-
             closeChannel();
         }
     }
 
-
     private void closeChannel() {
         Channel channel = channelRef.get();
         if (channel != null) {
@@ -474,52 +402,15 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     }
 
     /**
-     * Asynchronously flushes pending messages to the remote address, if they have not been
-     * flushed by other means.
-     * This task runs on a single thread shared among all clients, and thus
-     * should not perform operations that block or are expensive.
-     */
-    private class Flush implements Runnable {
-        @Override
-        public void run() {
-            try {
-                Channel channel = getConnectedChannel();
-                if (channel == null || !channel.isWritable()) {
-                    // Connection not available or buffer is full, no point in flushing
-                    return;
-                } else {
-                    // Connection is available and there is room in Netty's buffer
-                    MessageBatch toSend;
-                    synchronized (pendingMessageLock) {
-                        if(pendingMessage.isEmpty()){
-                            // Nothing to flush
-                            return;
-                        } else {
-                            toSend = pendingMessage;
-                            pendingMessage = new MessageBatch(messageBatchSize);
-                        }
-                    }
-                    checkState(!toSend.isFull(), "Filled batches should never be in pendingMessage
field");
-
-                    flushMessages(channel, toSend);
-                }
-            }catch (Throwable e){
-                LOG.error("Uncaught throwable", e);
-                throw Throwables.propagate(e);
-            }
-        }
-    }
-
-    /**
      * Asynchronously establishes a Netty connection to the remote address
-     * This task runs on a single thread shared among all clients, and thus
+     * This task runs on a single (by default) thread shared among all clients, and thus
      * should not perform operations that block.
      */
-    private class Connect implements Runnable {
+    private class Connector implements Runnable {
 
         private final InetSocketAddress address;
 
-        public Connect(InetSocketAddress address) {
+        public Connector(InetSocketAddress address) {
             this.address = address;
         }
 
@@ -553,7 +444,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
                                 checkState(setChannel);
                                 LOG.debug("successfully connected to {}, {} [attempt {}]",
address.toString(), newChannel.toString(),
                                         connectionAttempt);
-                                if (messagesLost.get() > 0) {
+                                if(messagesLost.get() > 0){
                                     LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
                                 }
                             } else {
@@ -571,7 +462,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
                             connectionAttempts + " failed attempts. " + messagesLost.get()
+ " messages were lost");
 
                 }
-            }catch (Throwable e){
+            } catch (Throwable e) {
                 LOG.error("Uncaught throwable", e);
                 throw Throwables.propagate(e);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/4104a0a9/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 7e0cb0d..f592aff 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -34,12 +34,12 @@ import backtype.storm.utils.Utils;
 
 public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-
+        
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
-
+    
     private ScheduledExecutorService clientScheduleService;
     private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
@@ -53,7 +53,7 @@ public class Context implements IContext {
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-        ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
         if (maxWorkers > 0) {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
@@ -62,7 +62,7 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
-
+        
         int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
         int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
         clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
@@ -80,8 +80,8 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
-    public IConnection connect(String storm_id, String host, int port) {
-        IConnection client =  new Client(storm_conf, clientChannelFactory,
+    public IConnection connect(String storm_id, String host, int port) {        
+        IConnection client =  new Client(storm_conf, clientChannelFactory, 
                 clientScheduleService, host, port);
         connections.add(client);
         return client;
@@ -91,18 +91,18 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
-        clientScheduleService.shutdown();
-
+        clientScheduleService.shutdown();        
+        
         for (IConnection conn : connections) {
             conn.close();
         }
-
+        
         try {
             clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             LOG.error("Error when shutting down client scheduler", e);
         }
-
+        
         connections = null;
 
         //we need to release resources associated with client channel factory

http://git-wip-us.apache.org/repos/asf/storm/blob/4104a0a9/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 169940f..63c861a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -133,8 +133,4 @@ class MessageBatch {
         if (payload_len >0)
             bout.write(message.message());
     }
-
-    public ArrayList<TaskMessage> getMsgs() {
-        return msgs;
-    }
 }
\ No newline at end of file


Mime
View raw message