storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [16/23] storm git commit: Forgot to commit
Date Tue, 14 Jul 2015 19:06:47 GMT
Forgot to commit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/167be7e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/167be7e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/167be7e7

Branch: refs/heads/0.9.x-branch
Commit: 167be7e79896178021df4e407d79e7e8d72fba44
Parents: afa638c
Author: Enno Shioji <eshioji@gmail.com>
Authored: Wed Jun 3 00:30:25 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Wed Jun 3 00:30:25 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java    | 18 +++---------------
 1 file changed, 3 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/167be7e7/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 3652886..5d9381e 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -149,7 +149,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
         // Dummy values to avoid null checks
         pendingMessage = new MessageBatch(messageBatchSize);
-        scheduler.scheduleWithFixedDelay(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
+        scheduler.scheduleWithFixedDelay(new Flush(), 10, 10, TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -172,7 +172,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * We will retry connection with exponential back-off policy
      */
     private void scheduleConnect(long delayMs) {
-        scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
+        scheduler.schedule(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
     }
 
     private boolean reconnectingAllowed() {
@@ -290,20 +290,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
         Batches batches;
         synchronized (pendingMessageLock) {
             batches = createBatches(pendingMessage, unfilled.getMsgs().iterator());
+            // We have a MessageBatch that isn't full yet, so we will wait for more messages.
             pendingMessage = batches.unfilled;
-
-            if(!pendingMessage.isEmpty()) {
-                // We have a MessageBatch that isn't full yet, so we will wait for more messages.
-                // However, we don't want to wait indefinitely so we schedule a timeout which
flushes
-                // this batch if it's still not flushed after a delay
-
-                // First, cancel the currently pending flush, because we just saw that Netty's
-                // buffer is full and thus we know we can wait longer
-                pendingFlush.cancel();
-
-                // Schedule the new flush
-                pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
-            }
         }
 
         // MessageBatches that were filled are immediately handed to Netty


Mime
View raw message