storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: STORM-2532: Remove uses of Utils.getAvailablePort where possible
Date Fri, 02 Jun 2017 17:16:25 GMT
Repository: storm
Updated Branches:
  refs/heads/master 138770d47 -> 5befe2746


STORM-2532: Remove uses of Utils.getAvailablePort where possible


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01b85647
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01b85647
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01b85647

Branch: refs/heads/master
Commit: 01b856479ab147fc148c928a500dbea4251fbf12
Parents: 30cbae5
Author: Stig Rohde Døssing <sdo@it-minds.dk>
Authored: Sat Apr 22 14:14:11 2017 +0200
Committer: Stig Rohde Døssing <sdo@it-minds.dk>
Committed: Thu May 25 17:54:48 2017 +0200

----------------------------------------------------------------------
 .../org/apache/storm/messaging/IConnection.java |  6 ++
 .../org/apache/storm/messaging/IContext.java    |  2 +-
 .../apache/storm/messaging/local/Context.java   | 17 +++-
 .../apache/storm/messaging/netty/Client.java    |  5 ++
 .../apache/storm/messaging/netty/Context.java   |  4 +-
 .../apache/storm/messaging/netty/Server.java    |  7 ++
 .../src/jvm/org/apache/storm/utils/Utils.java   | 18 ++++-
 .../apache/storm/messaging/netty_unit_test.clj  | 21 ++---
 .../storm/security/auth/nimbus_auth_test.clj    | 81 +++++++++-----------
 .../java/org/apache/storm/LocalCluster.java     |  4 +
 10 files changed, 102 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index 7042dc3..5d097d7 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -55,6 +55,12 @@ public interface IConnection {
      * @return a Load for each of the tasks it knows about.
      */
     public Map<Integer, Load> getLoad(Collection<Integer> tasks);
+    
+    /**
+     * Get the port for this connection
+     * @return The port this connection is using
+     */
+    public int getPort();
 
     /**
      * close this connection

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 97148ce..c5c2261 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -36,7 +36,7 @@ public interface IContext {
     public void prepare(Map<String, Object> topoConf);
     
     /**
-     * This method is invoked when a worker is unload a messaging plugin
+     * This method is invoked when a worker is unloading a messaging plugin
      */
     public void term();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index 06318b8..23e934a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -45,6 +45,11 @@ public class Context implements IContext {
     private static class LocalServer implements IConnection {
         volatile IConnectionCallback _cb;
         final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
+        final int port;
+        
+        public LocalServer(int port) {
+            this.port = port;
+        }
 
         @Override
         public void registerRecv(IConnectionCallback cb) {
@@ -77,6 +82,11 @@ public class Context implements IContext {
         public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
             _load.putAll(taskToLoad);
         }
+
+        @Override
+        public int getPort() {
+            return port;
+        }
  
         @Override
         public void close() {
@@ -168,6 +178,11 @@ public class Context implements IContext {
         public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
             _server.sendLoadMetrics(taskToLoad);
         }
+
+        @Override
+        public int getPort() {
+            return _server.getPort();
+        }
  
         @Override
         public void close() {
@@ -185,7 +200,7 @@ public class Context implements IContext {
         String key = nodeId + "-" + port;
         LocalServer ret = _registry.get(key);
         if (ret == null) {
-            ret = new LocalServer();
+            ret = new LocalServer(port);
             LocalServer tmp = _registry.putIfAbsent(key, ret);
             if (tmp != null) {
                 ret = tmp;

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 52fdda8..dd07d60 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -414,6 +414,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject,
ISa
         return false;
     }
 
+    @Override
+    public int getPort() {
+        return dstAddress.getPort();
+    }
+    
     /**
      * Gracefully close this client.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 10ffc83..486cd03 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -65,7 +65,7 @@ public class Context implements IContext {
      */
     public synchronized IConnection bind(String storm_id, int port) {
         IConnection server = new Server(topoConf, port);
-        connections.put(key(storm_id, port), server);
+        connections.put(key(storm_id, server.getPort()), server);
         return server;
     }
 
@@ -80,7 +80,7 @@ public class Context implements IContext {
         }
         IConnection client =  new Client(topoConf, clientChannelFactory, 
                 clientScheduleService, host, port, this);
-        connections.put(key(host, port), client);
+        connections.put(key(host, client.getPort()), client);
         return client;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 376006f..91ef702 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -64,6 +64,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
     private KryoValuesSerializer _ser;
     private IConnectionCallback _cb = null; 
+    private final int boundPort;
     
     @SuppressWarnings("rawtypes")
     Server(Map<String, Object> topoConf, int port) {
@@ -100,6 +101,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject,
ISaslServe
 
         // Bind and start to accept incoming connections.
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
+        boundPort = ((InetSocketAddress)channel.getLocalAddress()).getPort();
         allChannels.add(channel);
     }
     
@@ -156,6 +158,11 @@ class Server extends ConnectionWithStatus implements IStatefulObject,
ISaslServe
         allChannels.remove(channel);
     }
 
+    @Override
+    public int getPort() {
+        return boundPort;
+    }
+    
     /**
      * close all channels, and release resources
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 45bd123..bc1426f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1166,18 +1166,30 @@ public class Utils {
         return null;
     }
 
-    public static int getAvailablePort(int prefferedPort) {
+    /**
+     * Gets an available port. Consider if it is possible to pass port 0 to the
+     * server instead of using this method, since there is no guarantee that the
+     * port returned by this method will remain free.
+     *
+     * @param preferredPort
+     * @return The preferred port if available, or a random available port
+     */
+    public static int getAvailablePort(int preferredPort) {
         int localPort = -1;
-        try(ServerSocket socket = new ServerSocket(prefferedPort)) {
+        try(ServerSocket socket = new ServerSocket(preferredPort)) {
             localPort = socket.getLocalPort();
         } catch(IOException exp) {
-            if (prefferedPort > 0) {
+            if (preferredPort > 0) {
                 return getAvailablePort(0);
             }
         }
         return localPort;
     }
 
+    /**
+     * Shortcut to calling {@link #getAvailablePort(int) } with 0 as the preferred port
+     * @return A random available port
+     */
     public static int getAvailablePort() {
         return getAvailablePort(0);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
index 2833255..ffbd0cd 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj
@@ -22,7 +22,6 @@
   (:import [java.util ArrayList]
            (org.apache.storm.daemon.worker WorkerState)))
 
-(def port (Utils/getAvailablePort))
 (def task 1)
 
 ;; In a "real" cluster (or an integration test), Storm itself would ensure that a topology's
workers would only be
@@ -69,11 +68,10 @@
   (log-message "1. Should send and receive a basic message")
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         context (TransportFactory/makeContext storm-conf)
-        port (Utils/getAvailablePort (int 6700))
         resp (atom nil)
-        server (.bind context nil port)
+        server (.bind context nil 0)
         _ (register-callback (fn [message] (reset! resp message)) server)
-        client (.connect context nil "localhost" port)
+        client (.connect context nil "localhost" (.getPort server))
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))]
     (wait-for-not-nil resp)
@@ -107,11 +105,10 @@
   (log-message "2 test load")
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         context (TransportFactory/makeContext storm-conf)
-        port (Utils/getAvailablePort (int 6700))
         resp (atom nil)
-        server (.bind context nil port)
+        server (.bind context nil 0)
         _ (register-callback (fn [message] (reset! resp message)) server)
-        client (.connect context nil "localhost" port)
+        client (.connect context nil "localhost" (.getPort server))
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         _ (.sendLoadMetrics server {(int 1) 0.0 (int 2) 1.0})
@@ -150,11 +147,10 @@
   (log-message "3 Should send and receive a large message")
   (let [req_msg (apply str (repeat 2048000 'c'))
         context (TransportFactory/makeContext storm-conf)
-        port (Utils/getAvailablePort (int 6700))
         resp (atom nil)
-        server (.bind context nil port)
+        server (.bind context nil 0)
         _ (register-callback (fn [message] (reset! resp message)) server)
-        client (.connect context nil "localhost" port)
+        client (.connect context nil "localhost" (.getPort server))
         _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))]
     (wait-for-not-nil resp)
@@ -238,10 +234,9 @@
         resp (ArrayList.)
         received (atom 0)
         context (TransportFactory/makeContext storm-conf)
-        port (Utils/getAvailablePort (int 6700))
-        server (.bind context nil port)
+        server (.bind context nil 0)
         _ (register-callback (fn [message] (.add resp message) (swap! received inc)) server)
-        client (.connect context nil "localhost" port)
+        client (.connect context nil "localhost" (.getPort server))
         _ (wait-until-ready [server client])]
     (doseq [num (range 1 num-messages)]
       (let [req_msg (str num)]

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 8476e7e..83a3267 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -40,9 +40,9 @@
          conf (if login-cfg (merge conf {"java.security.auth.login.config" login-cfg}) conf)]
     conf))
 
-(defmacro with-test-cluster [args & body]
+(defmacro with-test-cluster [[cluster-sym & args] & body]
   `(let [conf# (to-conf ~@args)]
-     (with-open [_# (.build (doto (LocalCluster$Builder. )
+     (with-open [~cluster-sym (.build (doto (LocalCluster$Builder. )
                       (.withNimbusDaemon)
                       (.withDaemonConf conf#)
                       (.withSupervisors 0)
@@ -50,21 +50,19 @@
        ~@body)))
 
 (deftest Simple-authentication-test
-  (let [port (Utils/getAvailablePort)]
-    (with-test-cluster [port nil nil "org.apache.storm.security.auth.SimpleTransportPlugin"]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
-                               STORM-NIMBUS-RETRY-TIMES 0})
-            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Positive authorization) Simple protocol w/o authentication/authorization
enforcement"
-                 (is (thrown-cause? NotAliveException
-                              (.activate nimbus_client "topo-name"))))
-        (.close client)))))
+  (with-test-cluster [cluster 0 nil nil "org.apache.storm.security.auth.SimpleTransportPlugin"]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Simple protocol w/o authentication/authorization
enforcement"
+               (is (thrown-cause? NotAliveException
+                            (.activate nimbus_client "topo-name"))))
+      (.close client))))
 
 (deftest test-noop-authorization-w-simple-transport
-  (let [port (Utils/getAvailablePort)
-        cluster-state (Mockito/mock IStormClusterState)
+  (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)
         topo-name "topo-name"]
     (.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/empty))
@@ -75,12 +73,12 @@
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
-                                NIMBUS-THRIFT-PORT port
+                                NIMBUS-THRIFT-PORT 0
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                                {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
                                 STORM-NIMBUS-RETRY-TIMES 0})
-            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+            client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
             nimbus_client (.getClient client)]
         (testing "(Positive authorization) Authorization plugin should accept client request"
                  (is (thrown-cause? NotAliveException
@@ -88,8 +86,7 @@
         (.close client)))))
 
 (deftest test-deny-authorization-w-simple-transport
-  (let [port (Utils/getAvailablePort)
-        cluster-state (Mockito/mock IStormClusterState)
+  (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)
         topo-name "topo-name"
         topo-id "topo-name-1"]
@@ -102,13 +99,13 @@
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
-                                NIMBUS-THRIFT-PORT port
+                                NIMBUS-THRIFT-PORT 0
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                               {STORM-THRIFT-TRANSPORT-PLUGIN   "org.apache.storm.security.auth.SimpleTransportPlugin"
-                               Config/NIMBUS_THRIFT_PORT port
+                               Config/NIMBUS_THRIFT_PORT (.getThriftServerPort cluster)
                                STORM-NIMBUS-RETRY-TIMES        0})
-            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+            client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
             nimbus_client (.getClient client)
             topologyInitialStatus (TopologyInitialStatus/findByValue 2)
             submitOptions (SubmitOptions. topologyInitialStatus)]
@@ -134,26 +131,24 @@
         (.close client)))))
 
 (deftest test-noop-authorization-w-sasl-digest
-  (let [port (Utils/getAvailablePort)]
-    (with-test-cluster [port
-                  "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                  "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
-                  "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                               Config/NIMBUS_THRIFT_PORT port
-                               STORM-NIMBUS-RETRY-TIMES 0})
-            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Positive authorization) Authorization plugin should accept client request"
-                 (is (thrown-cause? NotAliveException
-                              (.activate nimbus_client "topo-name"))))
-        (.close client)))))
+  (with-test-cluster [cluster 0
+                "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+                "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+                             Config/NIMBUS_THRIFT_PORT (.getThriftServerPort cluster)
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Authorization plugin should accept client request"
+               (is (thrown-cause? NotAliveException
+                            (.activate nimbus_client "topo-name"))))
+      (.close client))))
 
 (deftest test-deny-authorization-w-sasl-digest
-  (let [port (Utils/getAvailablePort)
-        cluster-state (Mockito/mock IStormClusterState)
+  (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)
         topo-name "topo-name"
         topo-id "topo-name-1"]
@@ -166,15 +161,15 @@
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
-                                NIMBUS-THRIFT-PORT port
+                                NIMBUS-THRIFT-PORT 0
                                 "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
                                 STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})))]
       (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                                {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
                                "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                               Config/NIMBUS_THRIFT_PORT port
+                               Config/NIMBUS_THRIFT_PORT (.getThriftServerPort cluster)
                                STORM-NIMBUS-RETRY-TIMES 0})
-            client (NimbusClient. storm-conf "localhost" port nimbus-timeout)
+            client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
             nimbus_client (.getClient client)
             topologyInitialStatus (TopologyInitialStatus/findByValue 2)
             submitOptions (SubmitOptions. topologyInitialStatus)]

http://git-wip-us.apache.org/repos/asf/storm/blob/01b85647/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 25a3ec6..3f4a773 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -629,6 +629,10 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware,
Iface {
         return getNimbus().getTopologyInfo(id);
     }
 
+    public int getThriftServerPort() {
+        return thriftServer.getPort();
+    }
+
     @Override
     public synchronized void close() throws Exception {
         if (nimbus != null) {


Mime
View raw message