storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [03/23] storm git commit: Remove background flushing because it doesn't seem necessary. Netty's Channel queues up written data on an unbounded buffer. The background flushing seems to have been added to avoid this, but in practice it was probably doing i
Date Tue, 14 Jul 2015 19:06:34 GMT
Remove background flushing because it doesn't seem necessary. Netty's Channel queues up written
data on an unbounded buffer. The background flushing seems to have been added to avoid this,
but in practice it was probably doing it anyways because flushMessages(), which is called
by send() doesn't check for isWritable. Moreover, queuing on an unbounded buffer seems fine
because back pressure is provided by MAX_PENDING_TUPLE. If OOME occurs due to this buffer
overflowing, it seems reasonable that one has to reduce MAX_PENDING_TUPLE, rather than Storm
trying to cope with it by dropping messages.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b7d84bdc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b7d84bdc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b7d84bdc

Branch: refs/heads/0.9.x-branch
Commit: b7d84bdc7fd3de34f45a94131cdbb6bfbd3763dc
Parents: 91b8eb3
Author: Enno Shioji <eshioji@gmail.com>
Authored: Thu May 28 22:27:31 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Thu May 28 22:27:31 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 31 +-------------------
 1 file changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b7d84bdc/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 4e97035..4bbe989 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -128,8 +128,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     private final ListeningScheduledExecutorService scheduler;
     protected final Map stormConf;
 
-    private AtomicReference<MessageBatch> pendingMessageBatch;
-
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port) {
         closing = false;
@@ -145,8 +143,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
-        pendingMessageBatch = new AtomicReference<MessageBatch>(new MessageBatch(messageBatchSize));
-
         // Initiate connection to remote destination
         bootstrap = createClientBootstrap(factory, bufferSize);
         dstAddress = new InetSocketAddress(host, port);
@@ -155,8 +151,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
         // Launch background flushing thread
         long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts);
-        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs,
-                TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -175,28 +169,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         return "";
     }
 
-    private Runnable createBackgroundFlusher() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                if (!closing) {
-                    LOG.debug("flushing pending messages to {} in background", dstAddressPrefixedName);
-                    flushPendingMessages();
-                }
-            }
-        };
-    }
-
-    private void flushPendingMessages() {
-        Channel channel = channelRef.get();
-        if (connectionEstablished(channel)) {
-            MessageBatch toFlush = pendingMessageBatch.getAndSet(new MessageBatch(messageBatchSize));
-            flushMessages(channel, toFlush);
-        } else {
-            closeChannelAndReconnect(channel);
-        }
-    }
-
     /**
      * We will retry connection with exponential back-off policy
      */
@@ -276,7 +248,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             return;
         }
 
-        MessageBatch toSend = pendingMessageBatch.getAndSet(new MessageBatch(messageBatchSize));
+        MessageBatch toSend = new MessageBatch(messageBatchSize);
 
         // Collect messages into batches (to optimize network throughput), then flush them.
         while (msgs.hasNext()) {
@@ -381,7 +353,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
-            flushPendingMessages();
             closeChannel();
         }
     }


Mime
View raw message