storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [06/12] git commit: Ensure we don't overflow the backoff value.
Date Sat, 07 Dec 2013 06:10:02 GMT
Ensure we don't overflow the backoff value.

The first attempt to fix this (213102b36f890) did not correctly address
the issue.  The 32 bit signed integer frequently overflows, resulting in
a bad value for Random.nextInt().

The default for storm.messaging.netty.max_retries is now 30 (instead of
100), and there is an upper limit of 30 for max_retries.

I also did a whitespace cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c638db0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c638db0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c638db0e

Branch: refs/heads/master
Commit: c638db0e88e3c56f808c8a76a88f94d7bf1988c4
Parents: 4e19589
Author: Brenden Matthews <brenden@diddyinc.com>
Authored: Wed Oct 30 09:41:13 2013 -0700
Committer: Brenden Matthews <brenden@diddyinc.com>
Committed: Wed Dec 4 14:30:52 2013 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +-
 .../backtype/storm/messaging/netty/Client.java  | 58 ++++++++++----------
 2 files changed, 29 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c638db0e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 08c7889..a5b31f4 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -86,7 +86,7 @@ zmq.hwm: 0
 storm.messaging.netty.server_worker_threads: 1
 storm.messaging.netty.client_worker_threads: 1
 storm.messaging.netty.buffer_size: 5242880 #5MB buffer
-storm.messaging.netty.max_retries: 100
+storm.messaging.netty.max_retries: 30
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c638db0e/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
index 00431d4..91e4bd4 100644
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -23,14 +23,14 @@ import backtype.storm.utils.Utils;
 
 class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private final int max_retries; 
-    private final int base_sleep_ms; 
-    private final int max_sleep_ms; 
+    private final int max_retries;
+    private final int base_sleep_ms;
+    private final int max_sleep_ms;
     private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage
or ControlMessage
     private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
     private InetSocketAddress remote_addr;
-    private AtomicInteger retries; 
+    private AtomicInteger retries;
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
@@ -38,14 +38,14 @@ class Client implements IConnection {
 
     @SuppressWarnings("rawtypes")
     Client(Map storm_conf, String host, int port) {
-        message_queue = new LinkedBlockingQueue<Object>(); 
+        message_queue = new LinkedBlockingQueue<Object>();
         retries = new AtomicInteger(0);
         channelRef = new AtomicReference<Channel>(null);
         being_closed = new AtomicBoolean(false);
 
-        // Configure 
+        // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
         base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -74,9 +74,9 @@ class Client implements IConnection {
     void reconnect() {
         try {
             int tried_count = retries.incrementAndGet();
-            if (tried_count < max_retries) {
+            if (tried_count <= max_retries) {
                 Thread.sleep(getSleepTimeMs());
-                LOG.info("Reconnect ... [{}]", tried_count);   
+                LOG.info("Reconnect ... [{}]", tried_count);
                 bootstrap.connect(remote_addr);
                 LOG.debug("connection started...");
             } else {
@@ -85,7 +85,7 @@ class Client implements IConnection {
             }
         } catch (InterruptedException e) {
             LOG.warn("connection failed", e);
-        } 
+        }
     }
 
     /**
@@ -93,19 +93,17 @@ class Client implements IConnection {
      */
     private int getSleepTimeMs()
     {
-        int backoff = 1 << Math.max(1, retries.get());
+        int backoff = 1 << retries.get();
         int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
         if ( sleepMs > max_sleep_ms )
             sleepMs = max_sleep_ms;
-        if ( sleepMs < base_sleep_ms )
-          sleepMs = base_sleep_ms;
         return sleepMs;
     }
 
     /**
-     * Enqueue a task message to be sent to server 
+     * Enqueue a task message to be sent to server
      */
-    public void send(int task, byte[] message) {        
+    public void send(int task, byte[] message) {
         //throw exception if the client is being closed
         if (being_closed.get()) {
             throw new RuntimeException("Client is being closed, and does not take requests
any more");
@@ -128,43 +126,43 @@ class Client implements IConnection {
         MessageBatch batch = new MessageBatch(buffer_size);
         Object msg = message_queue.take();
         batch.add(msg);
-        
+
         //we will discard any message after CLOSE
-        if (msg==ControlMessage.CLOSE_MESSAGE) 
+        if (msg==ControlMessage.CLOSE_MESSAGE)
             return batch;
-        
+
         while (!batch.isFull()) {
             //peek the next message
             msg = message_queue.peek();
             //no more messages
             if (msg == null) break;
-            
+
             //we will discard any message after CLOSE
             if (msg==ControlMessage.CLOSE_MESSAGE) {
                 message_queue.take();
                 batch.add(msg);
                 break;
             }
-            
+
             //try to add this msg into batch
             if (!batch.tryAdd((TaskMessage) msg))
                 break;
-            
+
             //remove this message
             message_queue.take();
         }
 
         return batch;
     }
-    
+
     /**
      * gracefully close this client.
-     * 
+     *
      * We will send all existing requests, and then invoke close_n_release() method
      */
     public synchronized void close() {
-        if (!being_closed.get()) {  
-            //enqueue a CLOSE message so that shutdown() will be invoked 
+        if (!being_closed.get()) {
+            //enqueue a CLOSE message so that shutdown() will be invoked
             try {
                 message_queue.put(ControlMessage.CLOSE_MESSAGE);
                 being_closed.set(true);
@@ -178,10 +176,10 @@ class Client implements IConnection {
      * close_n_release() is invoked after all messages have been sent.
      */
     void  close_n_release() {
-        if (channelRef.get() != null) 
+        if (channelRef.get() != null)
             channelRef.get().close().awaitUninterruptibly();
 
-        //we need to release resources 
+        //we need to release resources
         new Thread(new Runnable() {
             @Override
             public void run() {
@@ -194,10 +192,10 @@ class Client implements IConnection {
     }
 
     void setChannel(Channel channel) {
-        channelRef.set(channel); 
-        //reset retries   
+        channelRef.set(channel);
+        //reset retries
         if (channel != null)
-            retries.set(0); 
+            retries.set(0);
     }
 
 }


Mime
View raw message