storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [03/50] [abbrv] git commit: STORM-297: Use a shared pool for netty client flusher threads.
Date Wed, 11 Jun 2014 16:09:35 GMT
STORM-297: Use a shared pool for netty client flusher threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/deba5583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/deba5583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/deba5583

Branch: refs/heads/security
Commit: deba55833b8752c5275cb37ac3a1043583790963
Parents: baa3106
Author: Sean Zhong <clockfly@gmail.com>
Authored: Tue May 20 20:46:11 2014 +0800
Committer: Sean Zhong <clockfly@gmail.com>
Committed: Tue May 20 21:19:37 2014 +0800

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 32 ++++++++++++--------
 .../backtype/storm/messaging/netty/Context.java | 30 +++++++++++++++---
 2 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/deba5583/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 59a8a5c..9765647 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -34,6 +34,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -59,10 +61,13 @@ public class Client implements IConnection {
     MessageBatch messageBatch = null;
     private AtomicLong flushCheckTimer;
     private int flushCheckInterval;
+    private ScheduledExecutorService scheduler;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, String host, int port) {
+    Client(Map storm_conf, ChannelFactory factory, 
+            ScheduledExecutorService scheduler, String host, int port) {
         this.factory = factory;
+        this.scheduler = scheduler;
         channelRef = new AtomicReference<Channel>(null);
         closing = false;
         pendings = new AtomicLong(0);
@@ -92,12 +97,18 @@ public class Client implements IConnection {
         // Start the connection attempt.
         remote_addr = new InetSocketAddress(host, port);
         
-        Thread flushChecker = new Thread(new Runnable() {
+        // setup the connection asyncly now
+        scheduler.execute(new Runnable() {
             @Override
-            public void run() {
-                //make sure we have a connection
+            public void run() {   
                 connect();
-                
+            }
+        });
+        
+        Runnable flusher = new Runnable() {
+            @Override
+            public void run() {
+
                 while(!closing) {
                     long flushCheckTime = flushCheckTimer.get();
                     long now = System.currentTimeMillis();
@@ -107,18 +118,13 @@ public class Client implements IConnection {
                             flush(channel);
                         }
                     }
-                    try {
-                        Thread.sleep(flushCheckInterval);
-                    } catch (InterruptedException e) {
-                        break;
-                    }
                 }
                 
             }
-        }, name() + "-flush-checker");
+        };
         
-        flushChecker.setDaemon(true);
-        flushChecker.start();
+        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait
for 30s
+        scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/deba5583/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 2e762ce..8f2b17f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,9 +18,12 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.Map;
 import java.util.Vector;
 
@@ -29,14 +32,16 @@ import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.IContext;
 import backtype.storm.utils.Utils;
 
-import java.util.Map;
-import java.util.Vector;
-
 public class Context implements IContext {
+    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+        
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
+    
+    private ScheduledExecutorService clientScheduleService;
+    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
     /**
      * initialization per Storm configuration 
@@ -57,6 +62,10 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
+        
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS)) - 1;
+        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -72,7 +81,8 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, clientChannelFactory, host, port);
+        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+                clientScheduleService, host, port);
         connections.add(client);
         return client;
     }
@@ -81,12 +91,22 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
+        clientScheduleService.shutdown();        
+        
         for (IConnection conn : connections) {
             conn.close();
         }
+        
+        try {
+            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Error when shutting down client scheduler", e);
+        }
+        
         connections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
+
     }
 }


Mime
View raw message