storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From knusb...@apache.org
Subject [3/5] storm git commit: Reuse netty client in case worker already has one.
Date Fri, 13 Mar 2015 18:34:07 GMT
Reuse netty client in case worker already has one.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b02f6124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b02f6124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b02f6124

Branch: refs/heads/master
Commit: b02f612498b6310d41c0d5fb15d92300c8179094
Parents: 14bc9ce
Author: Kishor Patil <kpatil@yahoo-inc.com>
Authored: Tue Mar 10 17:14:09 2015 +0000
Committer: Kishor Patil <kpatil@yahoo-inc.com>
Committed: Tue Mar 10 17:14:09 2015 +0000

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |  2 +-
 .../backtype/storm/messaging/netty/Context.java | 26 +++++++++++++-------
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b02f6124/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index dd8a196..945d986 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -510,7 +510,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
-            context.removeClient(this);
+            context.removeClient(dstAddress.getHostName(),dstAddress.getPort());
             context = null;
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/b02f6124/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 75a2d88..615702d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -24,9 +24,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
 
 import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
@@ -38,7 +37,7 @@ public class Context implements IContext {
         
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
-    private Set<IConnection> connections;
+    private Map<String, IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
     private ScheduledExecutorService clientScheduleService;
@@ -50,7 +49,7 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
-        connections = new HashSet<IConnection>();
+        connections = new HashMap<String, IConnection>();
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -74,7 +73,7 @@ public class Context implements IContext {
      */
     public synchronized IConnection bind(String storm_id, int port) {
         IConnection server = new Server(storm_conf, port);
-        connections.add(server);
+        connections.put(key(storm_id, port), server);
         return server;
     }
 
@@ -82,14 +81,19 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public synchronized IConnection connect(String storm_id, String host, int port) {
+        IConnection connection = connections.get(key(host,port));
+        if(connection !=null)
+        {
+            return connection;
+        }
         IConnection client =  new Client(storm_conf, clientChannelFactory, 
                 clientScheduleService, host, port, this);
-        connections.add(client);
+        connections.put(key(host, port), client);
         return client;
     }
 
-    synchronized void removeClient(Client c) {
-        connections.remove(c);
+    synchronized void removeClient(String host, int port) {
+        connections.remove(key(host, port));
     }
 
     /**
@@ -98,7 +102,7 @@ public class Context implements IContext {
     public synchronized void term() {
         clientScheduleService.shutdown();        
         
-        for (IConnection conn : new HashSet<IConnection>(connections)) {
+        for (IConnection conn : connections.values()) {
             conn.close();
         }
         
@@ -114,4 +118,8 @@ public class Context implements IContext {
         clientChannelFactory.releaseExternalResources();
 
     }
+
+    private String key(String host, int port) {
+        return String.format("%s:%d", host, port);
+    }
 }


Mime
View raw message