storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From knusb...@apache.org
Subject [1/5] storm git commit: STORM-625: don't leak netty clients when workers move
Date Fri, 13 Mar 2015 18:34:05 GMT
Repository: storm
Updated Branches:
  refs/heads/master e77a300c5 -> 87a662730


STORM-625: don't leak netty clients when workers move


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5be9e9d9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5be9e9d9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5be9e9d9

Branch: refs/heads/master
Commit: 5be9e9d9cfe6fb497a7706308cc9c59061215cea
Parents: 2dbeb98
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Tue Mar 3 16:34:53 2015 -0600
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Tue Mar 3 16:34:53 2015 -0600

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |  6 +++++-
 .../backtype/storm/messaging/netty/Context.java | 21 ++++++++++++--------
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5be9e9d9/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 7392d3e..dd8a196 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -147,9 +147,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     private MessageBatch messageBatch = null;
     private final ListeningScheduledExecutorService scheduler;
     protected final Map stormConf;
+    private Context context;
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port) {
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port, Context context) {
+        this.context = context;
         closing = false;
         this.stormConf = stormConf;
         this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
@@ -508,6 +510,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+            context.removeClient(this);
+            context = null;
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
             flushPendingMessages();

http://git-wip-us.apache.org/repos/asf/storm/blob/5be9e9d9/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index f592aff..75a2d88 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -25,7 +25,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
-import java.util.Vector;
+import java.util.Set;
+import java.util.HashSet;
 
 import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
@@ -37,7 +38,7 @@ public class Context implements IContext {
         
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
-    private volatile Vector<IConnection> connections;
+    private Set<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
     private ScheduledExecutorService clientScheduleService;
@@ -49,7 +50,7 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
-        connections = new Vector<IConnection>();
+        connections = new HashSet<IConnection>();
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -71,7 +72,7 @@ public class Context implements IContext {
     /**
      * establish a server with a binding port
      */
-    public IConnection bind(String storm_id, int port) {
+    public synchronized IConnection bind(String storm_id, int port) {
         IConnection server = new Server(storm_conf, port);
         connections.add(server);
         return server;
@@ -80,20 +81,24 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
-    public IConnection connect(String storm_id, String host, int port) {        
+    public synchronized IConnection connect(String storm_id, String host, int port) {
         IConnection client =  new Client(storm_conf, clientChannelFactory, 
-                clientScheduleService, host, port);
+                clientScheduleService, host, port, this);
         connections.add(client);
         return client;
     }
 
+    synchronized void removeClient(Client c) {
+        connections.remove(c);
+    }
+
     /**
      * terminate this context
      */
-    public void term() {
+    public synchronized void term() {
         clientScheduleService.shutdown();        
         
-        for (IConnection conn : connections) {
+        for (IConnection conn : new HashSet<IConnection>(connections)) {
             conn.close();
         }
         


Mime
View raw message