storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [23/50] storm git commit: Merge branch 'master' into security-upmerge
Date Thu, 13 Nov 2014 19:37:18 GMT
Merge branch 'master' into security-upmerge

Conflicts:
	README.markdown
	conf/defaults.yaml
	pom.xml
	storm-core/src/clj/backtype/storm/daemon/common.clj
	storm-core/src/clj/backtype/storm/daemon/supervisor.clj
	storm-core/src/clj/backtype/storm/ui/core.clj
	storm-core/src/jvm/backtype/storm/Config.java
	storm-core/src/jvm/backtype/storm/utils/Utils.java
	storm-core/test/clj/backtype/storm/supervisor_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84b7deaa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84b7deaa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84b7deaa

Branch: refs/heads/master
Commit: 84b7deaa5badc07b7ea8a88fef589577c837f8ee
Parents: 7a06ef7 d9b68ab
Author: Robert (Bobby) Evans <bobby@apache.org>
Authored: Thu Aug 28 13:48:29 2014 -0500
Committer: Robert (Bobby) Evans <bobby@apache.org>
Committed: Thu Aug 28 13:48:29 2014 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |  15 +++
 README.markdown                                 |   7 ++
 STORM-UI-REST-API.md                            |   1 +
 bin/storm.cmd                                   |  10 +-
 conf/defaults.yaml                              |   1 +
 .../jvm/storm/starter/PrintSampleStream.java    |   4 +-
 pom.xml                                         |   8 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |   4 +-
 .../src/clj/backtype/storm/daemon/common.clj    |   3 +
 .../clj/backtype/storm/daemon/supervisor.clj    |  32 +++--
 .../src/clj/backtype/storm/daemon/worker.clj    |   1 -
 storm-core/src/clj/backtype/storm/event.clj     |   3 +
 .../backtype/storm/scheduler/EvenScheduler.clj  |   2 +-
 storm-core/src/clj/backtype/storm/testing.clj   |   9 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  40 ++++---
 storm-core/src/jvm/backtype/storm/Config.java   |  42 +++++--
 .../storm/multilang/JsonSerializer.java         |  14 ++-
 .../DefaultSerializationDelegate.java           |  61 ++++++++++
 .../GzipBridgeSerializationDelegate.java        |  64 ++++++++++
 .../GzipSerializationDelegate.java              |  68 +++++++++++
 .../serialization/SerializationDelegate.java    |  35 ++++++
 .../src/jvm/backtype/storm/task/ShellBolt.java  |   9 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  60 ++++++----
 .../test/clj/backtype/storm/scheduler_test.clj  |  21 ++++
 .../test/clj/backtype/storm/supervisor_test.clj | 118 ++++++++-----------
 .../GzipBridgeSerializationDelegateTest.java    |  82 +++++++++++++
 26 files changed, 555 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/README.markdown
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index 3e00b3d,0050227..2601bee
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -37,16 -35,8 +37,17 @@@ storm.zookeeper.auth.password: nul
  storm.cluster.mode: "distributed" # can be distributed or local
  storm.local.mode.zmq: false
  storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
 +storm.principal.tolocal: "backtype.storm.security.auth.DefaultPrincipalToLocal"
 +storm.group.mapping.service: "backtype.storm.security.auth.ShellBasedGroupsMapping"
  storm.messaging.transport: "backtype.storm.messaging.netty.Context"
 +storm.nimbus.retry.times: 5
 +storm.nimbus.retry.interval.millis: 2000
 +storm.nimbus.retry.intervalceiling.millis: 60000
 +storm.auth.simple-white-list.users: []
 +storm.auth.simple-acl.users: []
 +storm.auth.simple-acl.users.commands: []
 +storm.auth.simple-acl.admins: []
+ storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
  
  ### nimbus.* configs are for the master
  nimbus.host: "localhost"

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index ba3d1ce,3db8b41..7d31916
--- a/pom.xml
+++ b/pom.xml
@@@ -186,10 -186,9 +186,10 @@@
          <commons-lang.version>2.5</commons-lang.version>
          <commons-exec.version>1.1</commons-exec.version>
          <clj-time.version>0.4.1</clj-time.version>
-         <curator.version>2.4.0</curator.version>
+         <curator.version>2.5.0</curator.version>
          <json-simple.version>1.1</json-simple.version>
 -        <ring.version>0.3.11</ring.version>
 +        <ring.version>1.3.0</ring.version>
 +        <jetty.version>7.6.13.v20130916</jetty.version>
          <clojure.tools.logging.version>0.2.3</clojure.tools.logging.version>
          <clojure.math.numeric-tower.version>0.0.1</clojure.math.numeric-tower.version>
          <carbonite.version>1.4.0</carbonite.version>
@@@ -207,10 -206,7 +207,10 @@@
          <mockito.version>1.9.5</mockito.version>
          <reply.version>0.3.0</reply.version>
          <conjure.version>2.1.3</conjure.version>
-         <zookeeper.version>3.4.5</zookeeper.version>
+         <zookeeper.version>3.4.6</zookeeper.version>
 +        <conjure.version>2.1.3</conjure.version>
 +        <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
 +        <clojure-contrib.version>1.2.0</clojure-contrib.version>
  
      </properties>
  

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/clj/backtype/storm/LocalCluster.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/common.clj
index cf0549c,0b45535..f091dfb
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@@ -21,7 -21,7 +21,8 @@@
    (:import [backtype.storm.task WorkerTopologyContext])
    (:import [backtype.storm Constants])
    (:import [backtype.storm.metric SystemBolt])
 +  (:import [backtype.storm.security.auth IAuthorizer]) 
+   (:import [java.io InterruptedIOException])
    (:require [clojure.set :as set])  
    (:require [backtype.storm.daemon.acker :as acker])
    (:require [backtype.storm.thrift :as thrift])

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 0aea7ec,9937607..d0d71c3
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -850,42 -816,38 +850,50 @@@
    [sys?]
    (if (or (nil? sys?) (= "false" sys?)) false true))
  
- (defn json-response [data & [status]]
-   {:status (or status 200)
-    :headers {"Content-Type" "application/json"}
-    :body (to-json data)})
+ (defn wrap-json-in-callback [callback response]
+   (str callback "(" response ");"))
+ 
+ (defnk json-response
+   [data callback :serialize-fn to-json :status 200]
+      {:status status
+       :headers (if (not-nil? callback) {"Content-Type" "application/javascript"}
+                 {"Content-Type" "application/json"})
+       :body (if (not-nil? callback)
+               (wrap-json-in-callback callback (serialize-fn data))
+               (serialize-fn data))})
  
 +(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
 +
  (defroutes main-routes
-   (GET "/api/v1/cluster/configuration" []
-        (cluster-configuration))
-   (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]}]
+   (GET "/api/v1/cluster/configuration" [& m]
+        (json-response (cluster-configuration)
+                       (:callback m) :serialize-fn identity))
 -  (GET "/api/v1/cluster/summary" [& m]
 -       (json-response (cluster-summary) (:callback m)))
 -  (GET "/api/v1/supervisor/summary" [& m]
++  (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
 +       (let [user (.getUserName http-creds-handler servlet-request)]
 +         (assert-authorized-user servlet-request "getClusterInfo")
-          (json-response (cluster-summary user))))
-   (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]}]
++         (json-response (cluster-summary user) (:callback m))))
++  (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m]
 +       (assert-authorized-user servlet-request "getClusterInfo")
-        (json-response (supervisor-summary)))
-   (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]}]
+        (json-response (supervisor-summary) (:callback m)))
 -  (GET "/api/v1/topology/summary" [& m]
++  (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
 +       (assert-authorized-user servlet-request "getClusterInfo")
-        (json-response (all-topologies-summary)))
+        (json-response (all-topologies-summary) (:callback m)))
 -  (GET  "/api/v1/topology/:id" [id & m]
 -        (let [id (url-decode id)]
 -          (json-response (topology-page id (:window m) (check-include-sys? (:sys m))) (:callback
m))))
 +  (GET  "/api/v1/topology/:id" [:as {:keys [cookies servlet-request]} id & m]
 +        (let [id (url-decode id)
 +              user (.getUserName http-creds-handler servlet-request)]
 +          (assert-authorized-user servlet-request "getTopology" (topology-config id))
-           (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user))))
++          (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user)
(:callback m))))
    (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id &
m]
 -       (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m)))
(:callback m)))
 -  (GET "/api/v1/topology/:id/component/:component" [id component & m]
 +        (let [id (url-decode id)]
 +          (assert-authorized-user servlet-request "getTopology" (topology-config id))
-           (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys
m))))))
++          (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys
m))) (:callback m))))
 +  (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request]}
id component & m]
         (let [id (url-decode id)
 -             component (url-decode component)]
 -         (json-response (component-page id component (:window m) (check-include-sys? (:sys
m))) (:callback m))))
 -  (POST "/api/v1/topology/:id/activate" [id]
 +             component (url-decode component)
 +             user (.getUserName http-creds-handler servlet-request)]
 +         (assert-authorized-user servlet-request "getTopology" (topology-config id))
-          (json-response (component-page id component (:window m) (check-include-sys? (:sys
m)) user))))
++         (json-response (component-page id component (:window m) (check-include-sys? (:sys
m)) user) (:callback m))))
 +  (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id]
      (with-nimbus nimbus
        (let [id (url-decode id)
              tplg (.getTopologyInfo ^Nimbus$Client nimbus id)

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index c15c45d,8407010..a38b51e
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -102,14 -95,15 +102,21 @@@ public class Config extends HashMap<Str
       */
      public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
      public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
 -    
 +
 +    /**
 +     * Netty based messaging: Is authentication required for Netty messaging from client
worker process to server worker process.
 +     */
 +    public static final String STORM_MESSAGING_NETTY_AUTHENTICATION = "storm.messaging.netty.authentication";
 +    public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class;
 +
      /**
+      * The delegate for serializing metadata, should be used for serialized objects stored
in zookeeper and on disk.
+      * This is NOT used for compressing serialized tuples sent between topologies.
+      */
+     public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";
+     public static final Object STORM_META_SERIALIZATION_DELEGATE_SCHEMA = String.class;
+     
+     /**
       * A list of hosts of ZooKeeper servers used to manage the cluster.
       */
      public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
@@@ -746,22 -473,12 +753,25 @@@
      public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
  
      /**
 +     * Should the supervior try to run the worker as the lauching user or not.  Defaults
to false.
 +     */
 +    public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
 +    public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class;
 +
 +    /**
 +     * Full path to the worker-laucher executable that will be used to lauch workers when
 +     * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
 +     */
 +    public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher";
 +    public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
 +
 +    /**
-      * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings
are replaced
-      * with an identifier for this worker. Also, "%WORKER-ID%", "%STORM-ID%" and "%WORKER-PORT%"
are
-      * replaced with appropriate runtime values for this worker.
+      * The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%",
"%TOPOLOGY-ID%"
+      * and "%WORKER-PORT%" substrings are replaced with:
+      * %ID%          -> port (for backward compatibility),
+      * %WORKER-ID%   -> worker-id, 
+      * %TOPOLOGY-ID%    -> topology-id,
+      * %WORKER-PORT% -> port.
       */
      public static final String WORKER_CHILDOPTS = "worker.childopts";
      public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/Utils.java
index fff91e6,a657d2b..6d31d28
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@@ -478,66 -447,24 +469,87 @@@ public class Utils 
          return false;
      }
  
 +    /**
 +     * Is the cluster configured to interact with ZooKeeper in a secure way?
 +     * This only works when called from within Nimbus or a Supervisor process.
 +     * @param conf the storm configuration, not the topology configuration
 +     * @return true if it is configured else false.
 +     */
 +    public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
 +        return null != System.getProperty("java.security.auth.login.config")
 +            || (conf != null
 +                && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
 +                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
 +    }
 +
 +    /**
 +     * Is the topology configured to have ZooKeeper authentication.
 +     * @param conf the topology configuration
 +     * @return true if ZK is configured else false
 +     */
 +    public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
 +        return (conf != null
 +                && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
 +                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
 +    }
 +
 +    public static List<ACL> getWorkerACL(Map conf) {
 +        //This is a work around to an issue with ZK where a sasl super user is not super
unless there is an open SASL ACL so we are trying to give the correct perms
 +        if (!isZkAuthenticationConfiguredTopology(conf)) {
 +            return null;
 +        }
 +        String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
 +        if (stormZKUser == null) {
 +           throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+"
is not set");
 +        }
 +        String[] split = stormZKUser.split(":",2);
 +        if (split.length != 2) {
 +          throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not
appear to be in the form scheme:acl, i.e. sasl:storm-user");
 +        }
 +        ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
 +        ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
 +        return ret;
 +    }
 +
 +   public static String threadDump() {
 +       final StringBuilder dump = new StringBuilder();
 +       final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();
 +       final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(),
100);
 +       for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
 +           dump.append('"');
 +           dump.append(threadInfo.getThreadName());
 +           dump.append("\" ");
 +           final Thread.State state = threadInfo.getThreadState();
 +           dump.append("\n   java.lang.Thread.State: ");
 +           dump.append(state);
 +           final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
 +           for (final StackTraceElement stackTraceElement : stackTraceElements) {
 +               dump.append("\n        at ");
 +               dump.append(stackTraceElement);
 +           }
 +           dump.append("\n\n");
 +       }
 +       return dump.toString();
 +   }
++
+     // Assumes caller is synchronizing
+     private static SerializationDelegate getSerializationDelegate(Map stormConf) {
+         String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
+         SerializationDelegate delegate;
+         try {
+             Class delegateClass = Class.forName(delegateClassName);
+             delegate = (SerializationDelegate) delegateClass.newInstance();
+         } catch (ClassNotFoundException e) {
+             LOG.error("Failed to construct serialization delegate, falling back to default",
e);
+             delegate = new DefaultSerializationDelegate();
+         } catch (InstantiationException e) {
+             LOG.error("Failed to construct serialization delegate, falling back to default",
e);
+             delegate = new DefaultSerializationDelegate();
+         } catch (IllegalAccessException e) {
+             LOG.error("Failed to construct serialization delegate, falling back to default",
e);
+             delegate = new DefaultSerializationDelegate();
+         }
+         delegate.prepare(stormConf);
+         return delegate;
+     }
  }

http://git-wip-us.apache.org/repos/asf/storm/blob/84b7deaa/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/supervisor_test.clj
index 6f02688,5abfa89..c94688f
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@@ -463,94 -354,35 +463,90 @@@
    ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't
get rid of downloaded code
    )
  
 +(deftest test-supervisor-data-acls
 +  (testing "supervisor-data uses correct ACLs"
 +    (let [scheme "digest"
 +          digest "storm:thisisapoorpassword"
 +          auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
 +                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
 +          expected-acls supervisor/SUPERVISOR-ZK-ACLS
 +          fake-isupervisor (reify ISupervisor
 +                             (getSupervisorId [this] nil)
 +                             (getAssignmentId [this] nil))]
 +      (stubbing [uptime-computer nil
 +                 cluster/mk-storm-cluster-state nil
 +                 supervisor-state nil
 +                 local-hostname nil
 +                 mk-timer nil]
 +        (supervisor/supervisor-data auth-conf nil fake-isupervisor)
 +        (verify-call-times-for cluster/mk-storm-cluster-state 1)
 +        (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
 +                                            expected-acls)))))
 +
 +(deftest test-write-log-metadata
 +  (testing "supervisor writes correct data to logs metadata file"
 +    (let [exp-owner "alice"
 +          exp-worker-id "42"
 +          exp-storm-id "0123456789"
 +          exp-port 4242
 +          exp-logs-users ["bob" "charlie" "daryl"]
 +          storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
 +                      TOPOLOGY-USERS ["charlie" "bob"]
 +                      LOGS-USERS ["daryl"]}
 +          exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
 +                    "worker-id" exp-worker-id
 +                    LOGS-USERS exp-logs-users}
 +          conf {}]
 +      (mocking [supervisor/write-log-metadata-to-yaml-file!]
 +        (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
 +                                        exp-storm-id exp-port conf)
 +        (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
 +                                      exp-storm-id exp-port exp-data conf)))))
 +
 +(deftest test-worker-launcher-requires-user
 +  (testing "worker-launcher throws on blank user"
 +    (mocking [launch-process]
 +      (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
 +                                  #"(?i).*user cannot be blank.*"
 +                                  (supervisor/worker-launcher {} nil ""))))))
 +
 +(defn found? [sub-str input-str]
 +  (if (string? input-str)
 +    (contrib-str/substring? sub-str (str input-str))
 +    (some? #(contrib-str/substring? sub-str %) input-str)))
 +
 +(defn not-found? [sub-str input-str]
 +    (complement (found? sub-str input-str)))
 +
- (deftest test-substitute-childopts-happy-path
+ (deftest test-substitute-childopts-happy-path-string
    (testing "worker-launcher replaces ids in childopts"
-     (let [ worker-id "w-01"
-            storm-id "s-01"
-            port 9999
-            childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-            ]
-       (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id storm-id
port))
-       (is (not-found? "%WORKER-ID%" childopts-with-ids))
-       (is (found? "w-01" childopts-with-ids))
-       (is (not-found? "%STORM-ID%" childopts-with-ids))
-       (is (found? "s-01" childopts-with-ids))
-       (is (not-found? "%WORKER-PORT%" childopts-with-ids))
-       (is (found? "-9999." childopts-with-ids))
-       (is (not-found? "%ID%" childopts-with-ids))
-       (is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
-     )))
+     (let [worker-id "w-01"
+           topology-id "s-01"
+           port 9999
+           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log
-Xms256m"
+           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log"
"-Xms256m")
+           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id
port)]
+       (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-storm-id-alone
+ (deftest test-substitute-childopts-happy-path-list
    (testing "worker-launcher replaces ids in childopts"
-     (let [ worker-id "w-01"
-            storm-id "s-01"
-            port 9999
-            childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%STORM-ID%.log"]
-            (def childopts-with-ids (supervisor/substitute-childopts childopts worker-id
storm-id port))
-            (is (not-found? "%WORKER-ID%" childopts-with-ids))
-            (is (not-found? "w-01" childopts-with-ids))
-            (is (not-found? "%STORM-ID%" childopts-with-ids))
-            (is (found? "s-01" childopts-with-ids))
-            (is (not-found? "%WORKER-PORT%" childopts-with-ids))
-            (is (not-found? "-9999." childopts-with-ids))
-            (is (not-found? "%ID%" childopts-with-ids))
-            (is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) 
   )))
+     (let [worker-id "w-01"
+           topology-id "s-01"
+           port 9999
+           childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
"-Xms256m")
+           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log"
"-Xms256m")
+           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id
port)]
+       (is (= expected-childopts childopts-with-ids)))))
+ 
+ (deftest test-substitute-childopts-topology-id-alone
+   (testing "worker-launcher replaces ids in childopts"
+     (let [worker-id "w-01"
+           topology-id "s-01"
+           port 9999
+           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id
port)]
+       (is (= expected-childopts childopts-with-ids)))))
  
  (deftest test-substitute-childopts-no-keys
    (testing "worker-launcher has no ids to replace in childopts"


Mime
View raw message