storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2479: Fix port assignment race condition in storm-webapp tests
Date Mon, 08 May 2017 08:04:35 GMT
Repository: storm
Updated Branches:
  refs/heads/master 0639244f7 -> 9755ff547


STORM-2479: Fix port assignment race condition in storm-webapp tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1044473b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1044473b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1044473b

Branch: refs/heads/master
Commit: 1044473bb191c15dabfa434b8126f2fa523f59c9
Parents: c38d795
Author: Stig Rohde Døssing <stigdoessing@gmail.com>
Authored: Sun Apr 16 21:20:15 2017 +0200
Committer: Stig Rohde Døssing <stigdoessing@gmail.com>
Committed: Mon Apr 17 10:31:45 2017 +0200

----------------------------------------------------------------------
 .../jvm/org/apache/storm/nimbus/NimbusInfo.java |   2 +-
 .../storm/security/auth/ITransportPlugin.java   |   5 +
 .../security/auth/SaslTransportPlugin.java      |  13 +-
 .../security/auth/SimpleTransportPlugin.java    |  11 +-
 .../storm/security/auth/ThriftServer.java       |  45 +++-
 .../storm/security/auth/ThriftServerTest.java   |  38 ---
 .../apache/storm/security/auth/auth_test.clj    | 244 +++++++++----------
 .../apache/storm/daemon/drpc/DRPCServer.java    |  41 +++-
 .../storm/daemon/drpc/DRPCServerTest.java       |  55 ++---
 9 files changed, 229 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java b/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
index 2b5033e..d8f8adf 100644
--- a/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
@@ -39,7 +39,7 @@ public class NimbusInfo implements Serializable {
 
     public NimbusInfo(String host, int port, boolean isLeader) {
         if (host == null) throw new NullPointerException("Host cannot be null");
-        if (port <= 0) throw new IllegalArgumentException("Port must be positive");
+        if (port < 0) throw new IllegalArgumentException("Port cannot be negative");
         this.host = host;
         this.port = port;
         this.isLeader = isLeader;

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
index c0ab525..c60b2f2 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
@@ -54,4 +54,9 @@ public interface ITransportPlugin {
      *               Only applicable when using secure storm cluster. A null/blank value
here will just indicate to use the logged in user.
      */
     public TTransport connect(TTransport transport, String serverHost, String asUser) throws
IOException, TTransportException;
+    
+    /**
+     * @return The port this transport is using. This is not known until {@link #getServer(org.apache.thrift.TProcessor)}
has been called.
+     */
+    public int getPort();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
index cad2b30..d93573b 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
@@ -53,6 +53,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
     protected ThriftConnectionType type;
     protected Map storm_conf;
     protected Configuration login_conf;
+    private int port;
 
     @Override
     public void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf)
{
@@ -63,15 +64,16 @@ public abstract class SaslTransportPlugin implements ITransportPlugin
{
 
     @Override
     public TServer getServer(TProcessor processor) throws IOException, TTransportException
{
-        int port = type.getPort(storm_conf);
+        int configuredPort = type.getPort(storm_conf);
         Integer socketTimeout = type.getSocketTimeOut(storm_conf);
         TTransportFactory serverTransportFactory = getServerTransportFactory();
         TServerSocket serverTransport = null;
         if (socketTimeout != null) {
-            serverTransport = new TServerSocket(port, socketTimeout);
+            serverTransport = new TServerSocket(configuredPort, socketTimeout);
         } else {
-            serverTransport = new TServerSocket(port);
+            serverTransport = new TServerSocket(configuredPort);
         }
+        this.port = serverTransport.getServerSocket().getLocalPort();
         int numWorkerThreads = type.getNumThreads(storm_conf);
         Integer queueSize = type.getQueueSize(storm_conf);
 
@@ -100,6 +102,11 @@ public abstract class SaslTransportPlugin implements ITransportPlugin
{
      * @throws IOException
      */
     protected abstract TTransportFactory getServerTransportFactory() throws IOException;
+    
+    @Override
+    public int getPort() {
+        return this.port;
+    }
 
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 26cd4a1..b41af75 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -55,6 +55,7 @@ public class SimpleTransportPlugin implements ITransportPlugin {
     protected Map storm_conf;
     protected Configuration login_conf;
     private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
+    private int port;
 
     @Override
     public void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf)
{
@@ -65,8 +66,9 @@ public class SimpleTransportPlugin implements ITransportPlugin {
 
     @Override
     public TServer getServer(TProcessor processor) throws IOException, TTransportException
{
-        int port = type.getPort(storm_conf);
-        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
+        int configuredPort = type.getPort(storm_conf);
+        TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(configuredPort);
+        this.port = serverTransport.getPort();
         int numWorkerThreads = type.getNumThreads(storm_conf);
         int maxBufferSize = type.getMaxBufferSize(storm_conf);
         Integer queueSize = type.getQueueSize(storm_conf);
@@ -113,6 +115,11 @@ public class SimpleTransportPlugin implements ITransportPlugin {
         return null;
     }
 
+    @Override
+    public int getPort() {
+        return port;
+    }
+
     /**                                                                                 
                                                                                         
 
      * Processor that populate simple transport info into ReqContext, and then invoke a service
handler                                                                              
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index f97dceb..059b0d6 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -17,12 +17,14 @@
  */
 package org.apache.storm.security.auth;
 
+import java.io.IOException;
 import java.util.Map;
 
 import javax.security.auth.login.Configuration;
 
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,8 +33,9 @@ public class ThriftServer {
     private Map _storm_conf; //storm configuration
     protected TProcessor _processor = null;
     private final ThriftConnectionType _type;
-    private TServer _server = null;
+    private TServer _server;
     private Configuration _login_conf;
+    private int _port;
     
     public ThriftServer(Map storm_conf, TProcessor processor, ThriftConnectionType type)
{
         _storm_conf = storm_conf;
@@ -45,34 +48,50 @@ public class ThriftServer {
         } catch (Exception x) {
             LOG.error(x.getMessage(), x);
         }
+        try {
+            //locate our thrift transport plugin
+            ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf,
_login_conf);
+            //server
+            _server = transportPlugin.getServer(_processor);
+            _port = transportPlugin.getPort();
+        } catch (IOException | TTransportException ex) {
+            handleServerException(ex);
+        }
+
     }
 
     public void stop() {
-        if (_server != null)
-            _server.stop();
+        _server.stop();
     }
 
     /**
      * @return true if ThriftServer is listening to requests?
      */
     public boolean isServing() {
-        return _server != null && _server.isServing();
+        return _server.isServing();
     }
     
     public void serve()  {
         try {
-            //locate our thrift transport plugin
-            ITransportPlugin  transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf,
_login_conf);
-
-            //server
-            _server = transportPlugin.getServer(_processor);
-
             //start accepting requests
             _server.serve();
         } catch (Exception ex) {
-            LOG.error("ThriftServer is being stopped due to: " + ex, ex);
-            if (_server != null) _server.stop();
-            Runtime.getRuntime().halt(1); //shutdown server process since we could not handle
Thrift requests any more
+            handleServerException(ex);
+        }
+    }
+    
+    private void handleServerException(Exception ex) {
+        LOG.error("ThriftServer is being stopped due to: " + ex, ex);
+        if (_server != null) {
+            _server.stop();
         }
+        Runtime.getRuntime().halt(1); //shutdown server process since we could not handle
Thrift requests any more
+    }
+    
+    /**
+     * @return The port this server is/will be listening on
+     */
+    public int getPort() {
+        return _port;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java
deleted file mode 100644
index cad1f1d..0000000
--- a/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth;
-
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
-
-import org.junit.Test;
-
-public class ThriftServerTest {
-
-    @Test
-    public void testStopChecksForNull() {
-        ThriftServer server = new ThriftServer(Utils.readDefaultConfig(), null, ThriftConnectionType.DRPC);
-        server.stop();
-    }
-
-    @Test
-    public void testIsServingChecksForNull() {
-        ThriftServer server = new ThriftServer(Utils.readDefaultConfig(), null, ThriftConnectionType.DRPC);
-        Assert.assertFalse(server.isServing());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index fc95097..6eec5db 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -119,10 +119,10 @@
      (dummy-service-handler conf inimbus nil)))
 
 
-(defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf]
+(defn launch-server [login-cfg aznClass transportPluginClass serverConf]
   (let [conf1 (merge (clojurify-structure (ConfigUtils/readStormConfig))
                      {NIMBUS-AUTHORIZER aznClass
-                      NIMBUS-THRIFT-PORT server-port
+                      NIMBUS-THRIFT-PORT 0
                       STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
         conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
         conf (if serverConf (merge conf2 serverConf) conf2)
@@ -137,10 +137,10 @@
     (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing server))))
(fn [] (Time/sleep 100)))
     server ))
 
-(defmacro with-server [args & body]
-  `(let [server# (launch-server ~@args)]
+(defmacro with-server [[server-sym & args] & body]
+  `(let [~server-sym (launch-server ~@args)]
      ~@body
-     (.stop server#)
+     (.stop ~server-sym)
      ))
 
 (deftest kerb-to-local-test
@@ -151,49 +151,46 @@
     (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm"))))))
 
 (deftest Simple-authentication-test
-  (let [a-port (Utils/getAvailablePort)]
-    (with-server [a-port nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
-            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (.activate nimbus_client "security_auth_test_topology")
-        (.close client))
-
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                               STORM-NIMBUS-RETRY-TIMES 0})]
-        (testing "(Negative authentication) Server: Simple vs. Client: Digest"
-          (is (thrown-cause?  org.apache.thrift.transport.TTransportException
-                              (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
+  (with-server [server nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (.activate nimbus_client "security_auth_test_topology")
+      (.close client))
+
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+                             STORM-NIMBUS-RETRY-TIMES 0})]
+      (testing "(Negative authentication) Server: Simple vs. Client: Digest"
+        (is (thrown-cause?  org.apache.thrift.transport.TTransportException
+                            (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
 
 (deftest negative-whitelist-authorization-test
-  (let [a-port (Utils/getAvailablePort)]
-    (with-server [a-port nil
-                  "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
-                  "org.apache.storm.testing.SingleUserSimpleTransport" nil]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
-            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Negative authorization) Authorization plugin should reject client request"
-          (is (thrown-cause? AuthorizationException
-                             (.activate nimbus_client "security_auth_test_topology"))))
-        (.close client)))))
+  (with-server [server nil
+                "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
+                "org.apache.storm.testing.SingleUserSimpleTransport" nil]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Negative authorization) Authorization plugin should reject client request"
+        (is (thrown-cause? AuthorizationException
+                           (.activate nimbus_client "security_auth_test_topology"))))
+      (.close client))))
 
 (deftest positive-whitelist-authorization-test
-    (let [a-port (Utils/getAvailablePort)]
-      (with-server [a-port nil
-                    "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
-                    "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF
["user"]}]
-        (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                                {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
-              client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-              nimbus_client (.getClient client)]
-          (testing "(Positive authorization) Authorization plugin should accept client request"
-            (.activate nimbus_client "security_auth_test_topology"))
-          (.close client)))))
+  (with-server [server nil
+                "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
+                "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF
["user"]}]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Authorization plugin should accept client request"
+        (.activate nimbus_client "security_auth_test_topology"))
+      (.close client))))
 
 (deftest simple-acl-user-auth-test
   (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -326,91 +323,88 @@
 
 
 (deftest positive-authorization-test
-  (let [a-port (Utils/getAvailablePort)]
-    (with-server [a-port nil
-                  "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
-                  "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
-            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Positive authorization) Authorization plugin should accept client request"
-          (.activate nimbus_client "security_auth_test_topology"))
-        (.close client)))))
+  (with-server [server nil
+                "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+                "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authorization) Authorization plugin should accept client request"
+        (.activate nimbus_client "security_auth_test_topology"))
+      (.close client))))
 
 (deftest deny-authorization-test
-  (let [a-port (Utils/getAvailablePort)]
-    (with-server [a-port nil
-                  "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
-                  "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN         "org.apache.storm.security.auth.SimpleTransportPlugin"
-                               Config/NIMBUS_THRIFT_PORT       a-port
-                               DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
-            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Negative authorization) Authorization plugin should reject client request"
-          (is (thrown-cause? AuthorizationException
-                             (.activate nimbus_client "security_auth_test_topology"))))
-        (.close client)))))
+  (with-server [server nil
+                "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+                "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN         "org.apache.storm.security.auth.SimpleTransportPlugin"
+                             Config/NIMBUS_THRIFT_PORT       (.getPort server)
+                             DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Negative authorization) Authorization plugin should reject client request"
+        (is (thrown-cause? AuthorizationException
+                           (.activate nimbus_client "security_auth_test_topology"))))
+        (.close client))))
 
 (deftest digest-authentication-test
-  (let [a-port (Utils/getAvailablePort)]
-    (with-server [a-port
-                  "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                  nil
-                  "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                               STORM-NIMBUS-RETRY-TIMES 0})
-            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Positive authentication) valid digest authentication"
-          (.activate nimbus_client "security_auth_test_topology"))
-        (.close client))
-
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
-                               STORM-NIMBUS-RETRY-TIMES 0})
-            client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
-            nimbus_client (.getClient client)]
-        (testing "(Negative authentication) Server: Digest vs. Client: Simple"
-          (is (thrown-cause? org.apache.thrift.transport.TTransportException
-                             (.activate nimbus_client "security_auth_test_topology"))))
-        (.close client))
-
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf"
-                               STORM-NIMBUS-RETRY-TIMES 0})]
-        (testing "(Negative authentication) Invalid  password"
-          (is (thrown-cause? TTransportException
-                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf"
-                               STORM-NIMBUS-RETRY-TIMES 0})]
-        (testing "(Negative authentication) Unknown user"
-          (is (thrown-cause? TTransportException
-                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf"
-                               STORM-NIMBUS-RETRY-TIMES 0})]
-        (testing "(Negative authentication) nonexistent configuration file"
-          (is (thrown-cause? RuntimeException
-                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
-      (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                              {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                               "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf"
-                               STORM-NIMBUS-RETRY-TIMES 0})]
-        (testing "(Negative authentication) Missing client"
-          (is (thrown-cause? java.io.IOException
-                             (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
+  (with-server [server
+                "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+                nil
+                "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Positive authentication) valid digest authentication"
+        (.activate nimbus_client "security_auth_test_topology"))
+      (.close client))
+
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
+                             STORM-NIMBUS-RETRY-TIMES 0})
+          client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+          nimbus_client (.getClient client)]
+      (testing "(Negative authentication) Server: Digest vs. Client: Simple"
+        (is (thrown-cause? org.apache.thrift.transport.TTransportException
+                           (.activate nimbus_client "security_auth_test_topology"))))
+      (.close client))
+
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf"
+                             STORM-NIMBUS-RETRY-TIMES 0})]
+      (testing "(Negative authentication) Invalid  password"
+        (is (thrown-cause? TTransportException
+                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
+
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf"
+                             STORM-NIMBUS-RETRY-TIMES 0})]
+      (testing "(Negative authentication) Unknown user"
+        (is (thrown-cause? TTransportException
+                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
+
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf"
+                             STORM-NIMBUS-RETRY-TIMES 0})]
+      (testing "(Negative authentication) nonexistent configuration file"
+        (is (thrown-cause? RuntimeException
+                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
+
+    (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                            {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf"
+                             STORM-NIMBUS-RETRY-TIMES 0})]
+      (testing "(Negative authentication) Missing client"
+        (is (thrown-cause? java.io.IOException
+                           (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
 
 (deftest test-GetTransportPlugin-throws-RuntimeException
   (let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig))

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
index 0b15cfa..3e37ad7 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -61,7 +61,7 @@ public class DRPCServer implements AutoCloseable {
  
     private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer
port, Map<String, Object> conf) {
         ThriftServer ret = null;
-        if (port != null && port > 0) {
+        if (port != null && port >= 0) {
             ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
                     ThriftConnectionType.DRPC);
         }
@@ -76,7 +76,7 @@ public class DRPCServer implements AutoCloseable {
     private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
         Integer drpcHttpPort = (Integer) conf.get(DaemonConfig.DRPC_HTTP_PORT);
         Server ret = null;
-        if (drpcHttpPort != null && drpcHttpPort > 0) {
+        if (drpcHttpPort != null && drpcHttpPort >= 0) {
             LOG.info("Starting RPC HTTP servers...");
             String filterClass = (String) (conf.get(DaemonConfig.DRPC_HTTP_FILTER));
             @SuppressWarnings("unchecked")
@@ -119,6 +119,7 @@ public class DRPCServer implements AutoCloseable {
     private final ThriftServer _handlerServer;
     private final ThriftServer _invokeServer;
     private final Server _httpServer;
+    private Thread _handlerServerThread;
     private boolean _closed = false;
 
     public DRPCServer(Map<String, Object> conf) {
@@ -133,13 +134,21 @@ public class DRPCServer implements AutoCloseable {
     void start() throws Exception {
         LOG.info("Starting Distributed RPC servers...");
         new Thread(() -> _invokeServer.serve()).start();
-
+        
         if (_httpServer != null) {
             _httpServer.start();
         }
         
         if (_handlerServer != null) {
-            _handlerServer.serve();
+            _handlerServerThread = new Thread(_handlerServer::serve);
+            _handlerServerThread.start();
+        }
+    }
+    
+    @VisibleForTesting
+    void awaitTermination() throws InterruptedException {
+        if(_handlerServerThread != null) {
+            _handlerServerThread.join();
         } else {
             _httpServer.join();
         }
@@ -169,6 +178,29 @@ public class DRPCServer implements AutoCloseable {
         }
     }
     
+    /**
+     * @return The port the DRPC handler server is listening on
+     */
+    public int getDRPCPort() {
+        return _handlerServer.getPort();
+    }
+    
+    /**
+     * @return The port the DRPC invoke server is listening on
+     */
+    public int getDRPCInvokePort() {
+        return _invokeServer.getPort();
+    }
+    
+    /**
+     * @return The port the HTTP server is listening on. Not available until {@link #start()
} has run.
+     */
+    public int getHttpServerPort() {
+        assert _httpServer.getConnectors().length == 1;
+        
+        return _httpServer.getConnectors()[0].getLocalPort();
+    }
+    
     public static void main(String [] args) throws Exception {
         Utils.setupDefaultUncaughtExceptionHandler();
         Map<String, Object> conf = Utils.readStormConfig();
@@ -176,6 +208,7 @@ public class DRPCServer implements AutoCloseable {
             Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
             StormMetricsRegistry.startMetricsReporters(conf);
             server.start();
+            server.awaitTermination();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
index 594e9c3..76652e2 100644
--- a/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
+++ b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
@@ -38,7 +38,6 @@ import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.generated.DRPCRequest;
 import org.apache.storm.security.auth.SimpleTransportPlugin;
 import org.apache.storm.utils.DRPCClient;
-import org.apache.storm.utils.Utils;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -87,16 +86,11 @@ public class DRPCServerTest {
     
     @Test
     public void testGoodThrift() throws Exception {
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+        Map<String, Object> conf = getConf(0, 0, null);
         try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
-            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
-                 DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
invocationsPort)) {
+            server.start();
+            try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort());
+                 DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
server.getDRPCInvokePort())) {
                 Future<String> found = exec.submit(() -> client.getClient().execute("testing",
"test"));
                 DRPCRequest request = getNextAvailableRequest(invoke, "testing");
                 assertNotNull(request);
@@ -111,16 +105,11 @@ public class DRPCServerTest {
     
     @Test
     public void testFailedThrift() throws Exception {
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+        Map<String, Object> conf = getConf(0, 0, null);
         try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
-            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
-                    DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
invocationsPort)) {
+            server.start();
+            try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort());
+                    DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
server.getDRPCInvokePort())) {
                 Future<String> found = exec.submit(() -> client.getClient().execute("testing",
"test"));
                 DRPCRequest request = getNextAvailableRequest(invoke, "testing");
                 assertNotNull(request);
@@ -155,19 +144,13 @@ public class DRPCServerTest {
     @Test
     public void testGoodHttpGet() throws Exception {
         LOG.info("STARTING HTTP GET TEST...");
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+        Map<String, Object> conf = getConf(0, 0, 0);
         try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
+            server.start();
             //TODO need a better way to do this
             Thread.sleep(2000);
-            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
invocationsPort)) {
-                Future<String> found = exec.submit(() -> GET(httpPort, "testing",
"test"));
+            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
server.getDRPCInvokePort())) {
+                Future<String> found = exec.submit(() -> GET(server.getHttpServerPort(),
"testing", "test"));
                 DRPCRequest request = getNextAvailableRequest(invoke, "testing");
                 assertNotNull(request);
                 assertEquals("test", request.get_func_args());
@@ -182,19 +165,13 @@ public class DRPCServerTest {
     @Test
     public void testFailedHttpGet() throws Exception {
         LOG.info("STARTING HTTP GET (FAIL) TEST...");
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+        Map<String, Object> conf = getConf(0, 0, 0);
         try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
+            server.start();
             //TODO need a better way to do this
             Thread.sleep(2000);
-            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
invocationsPort)) {
-                Future<String> found = exec.submit(() -> GET(httpPort, "testing",
"test"));
+            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost",
server.getDRPCInvokePort())) {
+                Future<String> found = exec.submit(() -> GET(server.getHttpServerPort(),
"testing", "test"));
                 DRPCRequest request = getNextAvailableRequest(invoke, "testing");
                 assertNotNull(request);
                 assertEquals("test", request.get_func_args());


Mime
View raw message