storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [02/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:36 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/native/worker-launcher/impl/worker-launcher.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.c b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
index 5dc90d3..7baec01 100644
--- a/storm-core/src/native/worker-launcher/impl/worker-launcher.c
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
@@ -414,33 +414,45 @@ static int copy_file(int input, const char* in_filename,
   return 0;
 }
 
-int setup_stormdist(FTSENT* entry, uid_t euser) {
+/**
+ * Sets up permissions for a directory optionally making it user-writable.
+ * We set up the permissions r(w)xrws--- so that the file group (should be Storm's user group)
+ * has complete access to the directory, and the file user (The topology owner's user)
+ * is able to read and execute, and in certain directories, write. The setGID bit is set
+ * to make sure any files created under the directory will be accessible to storm's user for
+ * cleanup purposes.
+ */
+static int setup_permissions(FTSENT* entry, uid_t euser, int user_write) {
   if (lchown(entry->fts_path, euser, launcher_gid) != 0) {
-    fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
-      strerror(errno));
+    fprintf(ERRORFILE, "Failure to exec app initialization process - %s, fts_path=%s\n",
+            strerror(errno), entry->fts_path);
      return -1;
   }
   mode_t mode = entry->fts_statp->st_mode;
-  mode_t new_mode = (mode & (S_IRWXU)) | S_IRGRP | S_IWGRP;
-  if ((mode & S_IXUSR) == S_IXUSR) {
-    new_mode = new_mode | S_IXGRP;
+  // Preserve user read and execute and set group read and write.
+  mode_t new_mode = (mode & (S_IRUSR | S_IXUSR)) | S_IRGRP | S_IWGRP;
+  if (user_write) {
+    new_mode = new_mode | S_IWUSR;
   }
+  // If the entry is a directory, Add group execute and setGID bits.
   if ((mode & S_IFDIR) == S_IFDIR) {
-    new_mode = new_mode | S_ISGID;
+    new_mode = new_mode | S_IXGRP | S_ISGID;
   }
   if (chmod(entry->fts_path, new_mode) != 0) {
-    fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
-      strerror(errno));
+    fprintf(ERRORFILE, "Failure to exec app initialization process - %s, fts_path=%s\n",
+            strerror(errno), entry->fts_path);
     return -1;
   }
   return 0;
 }
 
-int setup_stormdist_dir(const char* local_dir) {
+
+int setup_dir_permissions(const char* local_dir, int user_writable) {
   //This is the same as
   //> chmod g+rwX -R $local_dir
-  //> chown -no-dereference -R $user:$supervisor-group $local_dir 
-
+  //> chmod g+s -R $local_dir
+  //> if [ $user_writable ]; then chmod u+w;  else u-w; fi
+  //> chown -no-dereference -R $user:$supervisor-group $local_dir
   int exit_code = 0;
   uid_t euser = geteuid();
 
@@ -450,7 +462,7 @@ int setup_stormdist_dir(const char* local_dir) {
   } else {
     char *(paths[]) = {strndup(local_dir,PATH_MAX), 0};
     if (paths[0] == NULL) {
-      fprintf(ERRORFILE, "Malloc failed in setup_stormdist_dir\n");
+      fprintf(ERRORFILE, "Malloc failed in setup_dir_permissions\n");
       return -1;
     }
     // check to make sure the directory exists
@@ -485,14 +497,14 @@ int setup_stormdist_dir(const char* local_dir) {
 
       case FTS_DP:        // A directory being visited in post-order
       case FTS_DOT:       // A dot directory
+      case FTS_SL:        // A symbolic link
+      case FTS_SLNONE:    // A broken symbolic link
         //NOOP
         fprintf(LOGFILE, "NOOP: %s\n", entry->fts_path); break;
       case FTS_D:         // A directory in pre-order
       case FTS_F:         // A regular file
-      case FTS_SL:        // A symbolic link
-      case FTS_SLNONE:    // A broken symbolic link
-        if (setup_stormdist(entry, euser) != 0) {
-          exit_code = -1;
+        if (setup_permissions(entry, euser, user_writable) != 0) {
+            exit_code = -1;
         }
         break;
       case FTS_DEFAULT:   // Unknown type of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/native/worker-launcher/impl/worker-launcher.h
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.h b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
index 3b1ec24..4a0b657 100644
--- a/storm-core/src/native/worker-launcher/impl/worker-launcher.h
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
@@ -66,7 +66,7 @@ extern FILE *LOGFILE;
 // the log file for error messages
 extern FILE *ERRORFILE;
 
-int setup_stormdist_dir(const char* local_dir);
+int setup_dir_permissions(const char* local_dir, int for_blob_permission);
 
 int exec_as_user(const char * working_dir, const char * args);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index d2a9c4f..775949e 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -20,10 +20,14 @@
   (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+  (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm.utils Time])
   (:import [org.apache.storm.tuple Fields])
+  (:import [org.mockito Mockito])
   (:use [org.apache.storm testing config clojure util])
   (:use [org.apache.storm.daemon common])
+  (:require [org.apache.storm [cluster :as cluster]])
+  (:require [org.apache.storm.daemon [executor :as executor]])
   (:require [org.apache.storm [thrift :as thrift]]))
 
 (deftest test-basic-topology
@@ -116,7 +120,6 @@
                              "timeout-tester"
                              {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
                              topology)
-      (advance-cluster-time cluster 11)
       (.feed feeder ["a"] 1)
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
@@ -283,7 +286,6 @@
                              "acking-test1"
                              {}
                              (:topology tracked))
-      (advance-cluster-time cluster 11)
       (.feed feeder1 [1])
       (tracked-wait tracked 1)
       (checker1 0)
@@ -326,7 +328,6 @@
                              "test-acking2"
                              {}
                              (:topology tracked))
-      (advance-cluster-time cluster 11)
       (.feed feeder [1])
       (tracked-wait tracked 1)
       (checker 0)
@@ -372,8 +373,7 @@
         {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
         topology
         (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      (advance-cluster-time cluster 11)
-      (.feed feeder ["a"] 1)
+      (.feedNoWait feeder ["a"] 1)
       (advance-cluster-time cluster 9)
       (is (not @bolt-prepared?))
       (is (not @spout-opened?))        
@@ -397,7 +397,6 @@
                              "test"
                              {}
                              (:topology tracked))
-      (advance-cluster-time cluster 11)
       (.feed feeder [1])
       (tracked-wait tracked 1)
       (checker 1)
@@ -596,60 +595,42 @@
              (read-tuples results "2")
              )))))
 
-(defbolt report-errors-bolt {}
-  [tuple collector]
-  (doseq [i (range (.getValue tuple 0))]
-    (report-error! collector (RuntimeException.)))
-  (ack! collector tuple))
-
+;;This is more of a unit test, but getting the timing right for
+;; an integration test is really hard
 (deftest test-throttled-errors
   (with-simulated-time
-    (with-tracked-cluster [cluster]
-      (let [state (:storm-cluster-state cluster)
-            [feeder checker] (ack-tracking-feeder ["num"])
-            tracked (mk-tracked-topology
-                     cluster
-                     (topology
-                       {"1" (spout-spec feeder)}
-                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
-            _       (submit-local-topology (:nimbus cluster)
-                                             "test-errors"
-                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
-                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
-                                              TOPOLOGY-DEBUG true
-                                              }
-                                             (:topology tracked))
-            _ (advance-cluster-time cluster 11)
-            storm-id (get-storm-id state "test-errors")
-            errors-count (fn [] (count (.errors state storm-id "2")))]
-
-        (is (nil? (.last-error state storm-id "2")))
-
-        ;; so it launches the topology
-        (advance-cluster-time cluster 2)
-        (.feed feeder [6])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 5)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 4 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [2])
-        (tracked-wait tracked 1)
-        (is (= 6 (errors-count)))
-        (is (.last-error state storm-id "2"))
-        
-        (advance-time-secs! 6)
-        (.feed feeder [3])
-        (tracked-wait tracked 1)
-        (is (= 8 (errors-count)))
-        (is (.last-error state storm-id "2"))))))
-
+    (let [error-count (atom 0)
+          worker-context (Mockito/mock WorkerTopologyContext)
+          cluster-state
+             (reify cluster/StormClusterState
+               (report-error
+                 [this storm-id component-id node port error]
+                 (swap! error-count inc)))
+          error-fn (executor/throttled-report-error-fn 
+            {:storm-conf {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
+                          TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4}
+             :storm-cluster-state cluster-state
+             :storm-id "topo"
+             :component-id "comp"
+             :worker-context worker-context})]
+        (. (Mockito/when (.getThisWorkerPort worker-context)) (thenReturn (Integer. 8080))) 
+        (error-fn (RuntimeException. "ERROR-1"))
+        (is (= 1 @error-count))
+        (error-fn (RuntimeException. "ERROR-2"))
+        (is (= 2 @error-count))
+        (error-fn (RuntimeException. "ERROR-3"))
+        (is (= 3 @error-count))
+        (error-fn (RuntimeException. "ERROR-4"))
+        (is (= 4 @error-count))
+        ;;Ignored
+        (error-fn (RuntimeException. "ERROR-5"))
+        (is (= 4 @error-count))
+        (Time/advanceTime 9000)
+        (error-fn (RuntimeException. "ERROR-6"))
+        (is (= 4 @error-count))
+        (Time/advanceTime 2000)
+        (error-fn (RuntimeException. "ERROR-7"))
+        (is (= 5 @error-count)))))
 
 (deftest test-acking-branching-complex
   ;; test acking with branching in the topology

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index cd139d7..584a6f3 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -116,9 +116,9 @@
                              (it/agg-bolt 4))}))]
          (.submitTopology cluster
                           "test-acking2"
-                          (Config.)
+                          (doto (Config.)
+                            (.put TOPOLOGY-DEBUG true))
                           (.getTopology tracked))
-         (advance-cluster-time (.getState cluster) 11)
          (.feed feeder [1])
          (Testing/trackedWait tracked (int 1))
          (checker 0)
@@ -148,13 +148,14 @@
                             "timeout-tester"
                             storm-conf
                             topology)
+           ;;Wait for the topology to come up
            (.feed feeder ["a"] 1)
            (.feed feeder ["b"] 2)
            (.feed feeder ["c"] 3)
            (Testing/advanceClusterTime cluster (int 9))
            (it/assert-acked tracker 1 3)
            (is (not (.isFailed tracker 2)))
-           (Testing/advanceClusterTime cluster (int 12))
+           (Testing/advanceClusterTime cluster (int 20))
            (it/assert-failed tracker 2)
            ))))))
 
@@ -174,19 +175,21 @@
                            {"1" (thrift/mk-spout-spec feeder)}
                            {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
                 storm-conf (doto (Config.)
+                             (.put TOPOLOGY-DEBUG true)
                              (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
                              (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
             (.submitTopology cluster
               "disable-timeout-tester"
               storm-conf
               topology)
+            ;;Wait for the topology to come up
             (.feed feeder ["a"] 1)
             (.feed feeder ["b"] 2)
             (.feed feeder ["c"] 3)
             (Testing/advanceClusterTime cluster (int 9))
             (it/assert-acked tracker 1 3)
             (is (not (.isFailed tracker 2)))
-            (Testing/advanceClusterTime cluster (int 12))
+            (Testing/advanceClusterTime cluster (int 20))
             (is (not (.isFailed tracker 2)))
             ))))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index 4d32a06..aae76e6 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -15,13 +15,16 @@
 ;; limitations under the License.
 (ns org.apache.storm.logviewer-test
   (:use [org.apache.storm config util])
-  (:require [org.apache.storm.daemon [logviewer :as logviewer]
-                                   [supervisor :as supervisor]])
+  (:require [org.apache.storm.daemon [logviewer :as logviewer]])
   (:require [conjure.core])
   (:use [clojure test])
   (:use [conjure core])
-  (:use [org.apache.storm.ui helpers])
-  (:import [org.apache.storm.daemon DirectoryCleaner])
+  (:use [org.apache.storm testing]
+        [org.apache.storm.ui helpers])
+  (:import [org.apache.storm.daemon DirectoryCleaner]
+           [org.apache.storm.daemon.supervisor SupervisorUtils]
+           [org.apache.storm.testing.staticmocking MockedSupervisorUtils]
+           [org.apache.storm.generated LSWorkerHeartbeat])
   (:import [java.nio.file Files Path DirectoryStream])
   (:import [java.nio.file Files])
   (:import [java.nio.file.attribute FileAttribute])
@@ -231,27 +234,33 @@
           mock-metaFile (mk-mock-File {:name "worker.yaml"
                                        :type :file})
           exp-id "id12345"
-          expected {exp-id port1-dir}]
-      (stubbing [supervisor/read-worker-heartbeats nil
-                 logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
-                 logviewer/get-worker-id-from-metadata-file exp-id]
-        (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))
+          expected {exp-id port1-dir}
+          supervisor-util (Mockito/mock SupervisorUtils)]
+      (with-open [_ (MockedSupervisorUtils. supervisor-util)]
+        (stubbing [logviewer/get-metadata-file-for-wroker-logdir mock-metaFile
+                   logviewer/get-worker-id-from-metadata-file exp-id]
+          (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn nil))
+          (is (= expected (logviewer/identify-worker-log-dirs [port1-dir]))))))))
 
 (deftest test-get-dead-worker-dirs
-         (testing "return directories for workers that are not alive"
-                  (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
-                        id->hb {"42" {:time-secs 1}} ;; map for alive ids
-                        now-secs 2
-                        unexpected-dir1 (mk-mock-File {:name "dir1" :type :directory})
-                        expected-dir2 (mk-mock-File {:name "dir2" :type :directory})
-                        expected-dir3 (mk-mock-File {:name "dir3" :type :directory})
-                        log-dirs #{unexpected-dir1 expected-dir2 expected-dir3}]
-                       (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir1,
-                                                                      "007" expected-dir2,
-                                                                      "" expected-dir3} ;; this tests a directory with no yaml file thus no worker id
-                                  supervisor/read-worker-heartbeats id->hb]
-                                 (is (= #{expected-dir2 expected-dir3}
-                                        (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))
+  (testing "return directories for workers that are not alive"
+    (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5}
+          hb (let [lwb (LSWorkerHeartbeat.)]
+                   (.set_time_secs lwb (int 1)) lwb)
+          id->hb {"42" hb}
+          now-secs 2
+          unexpected-dir1 (mk-mock-File {:name "dir1" :type :directory})
+          expected-dir2 (mk-mock-File {:name "dir2" :type :directory})
+          expected-dir3 (mk-mock-File {:name "dir3" :type :directory})
+          log-dirs #{unexpected-dir1 expected-dir2 expected-dir3}
+          supervisor-util (Mockito/mock SupervisorUtils)]
+      (with-open [_ (MockedSupervisorUtils. supervisor-util)]
+      (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir1,
+                                                     "007" expected-dir2,
+                                                     "" expected-dir3}] ;; this tests a directory with no yaml file thus no worker id
+        (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn id->hb))
+        (is (= #{expected-dir2 expected-dir3}
+              (logviewer/get-dead-worker-dirs conf now-secs log-dirs))))))))
 
 (deftest test-cleanup-fn
   (testing "cleanup function rmr's files of dead workers"

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 9f051f6..ba48ea9 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -260,18 +260,18 @@
                     {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]      
       (submit-local-topology (:nimbus cluster)
                              "metrics-tester"
-                             {}
+                             {TOPOLOGY-DEBUG true}
                              topology)
       
       (.feed feeder ["a"] 1)
       (advance-cluster-time cluster 6)
-      (assert-acked tracker 1)
       (assert-buckets! "myspout" "__fail-count/default" [] cluster)
       (assert-buckets! "myspout" "__ack-count/default" [1] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [1] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [1] cluster)            
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [1] cluster)     
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [1] cluster)
+      (assert-acked tracker 1)
 
       (.feed feeder ["b"] 2)      
       (advance-cluster-time cluster 5)
@@ -312,18 +312,19 @@
                     {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})]      
       (submit-local-topology (:nimbus cluster)
                              "timeout-tester"
-                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10
+                              TOPOLOGY-DEBUG true}
                              topology)
       (.feed feeder ["a"] 1)
       (.feed feeder ["b"] 2)
       (.feed feeder ["c"] 3)
       (advance-cluster-time cluster 9)
-      (assert-acked tracker 1 3)
       (assert-buckets! "myspout" "__ack-count/default" [2] cluster)
       (assert-buckets! "myspout" "__emit-count/default" [3] cluster)
       (assert-buckets! "myspout" "__transfer-count/default" [3] cluster)
       (assert-buckets! "mybolt" "__ack-count/myspout:default" [2] cluster)
       (assert-buckets! "mybolt" "__execute-count/myspout:default" [3] cluster)
+      (assert-acked tracker 1 3)
       
       (is (not (.isFailed tracker 2)))
       (advance-cluster-time cluster 30)

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index dbb89bd..14dd181 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1056,6 +1056,14 @@
         (assert-files-in-dir [])
         ))))
 
+(defn wait-for-status [nimbus name status]
+  (while-timeout 5000
+    (let [topo-summary (first (filter (fn [topo] (= name (.get_name topo))) (.get_topologies (.getClusterInfo nimbus))))
+          topo-status (if topo-summary (.get_status topo-summary) "NOT-RUNNING")]
+      (log-message "WAITING FOR "name" TO BE " status " CURRENT " topo-status)
+      (not= topo-status status))
+    (Thread/sleep 100)))
+
 (deftest test-leadership
   "Tests that leader actions can only be performed by master and non leader fails to perform the same actions."
   (with-inprocess-zookeeper zk-port
@@ -1082,7 +1090,7 @@
               (submit-local-topology nimbus "t1" {} topology)
               ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
               (.rebalance nimbus "t1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-              (Thread/sleep 1000)
+              (wait-for-status nimbus "t1" "ACTIVE") 
               (.deactivate nimbus "t1")
               (.activate nimbus "t1")
               (.rebalance nimbus "t1" (RebalanceOptions.))

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
index d6a431f..c1eacd6 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
@@ -98,42 +98,42 @@
 
 (defmacro with-simple-drpc-test-scenario
   [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
-  (let [client-port (available-port)
-        invocations-port (available-port (inc client-port))
-        storm-conf (merge (read-storm-config)
-                          {DRPC-AUTHORIZER-ACL-STRICT strict?
+  `(let [client-port# (available-port)
+         invocations-port# (available-port (inc client-port#))
+         storm-conf# (merge (read-storm-config)
+                          {DRPC-AUTHORIZER-ACL-STRICT ~strict?
                            DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
                            STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})]
-    `(with-server [~storm-conf
+    (with-server [storm-conf#
                    "org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
                    "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
                    "test/clj/org/apache/storm/security/auth/drpc-auth-server.jaas"
-                   ~client-port ~invocations-port]
+                   client-port# invocations-port#]
        (let [~alice-client (DRPCClient.
-                           (merge ~storm-conf {"java.security.auth.login.config"
+                           (merge storm-conf# {"java.security.auth.login.config"
                                               "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
                            "localhost"
-                           ~client-port)
+                           client-port#)
              ~bob-client (DRPCClient.
-                         (merge ~storm-conf {"java.security.auth.login.config"
+                         (merge storm-conf# {"java.security.auth.login.config"
                                             "test/clj/org/apache/storm/security/auth/drpc-auth-bob.jaas"})
                          "localhost"
-                         ~client-port)
+                         client-port#)
              ~charlie-client (DRPCClient.
-                               (merge ~storm-conf {"java.security.auth.login.config"
+                               (merge storm-conf# {"java.security.auth.login.config"
                                                   "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
                                "localhost"
-                               ~client-port)
+                               client-port#)
              ~alice-invok (DRPCInvocationsClient.
-                            (merge ~storm-conf {"java.security.auth.login.config"
+                            (merge storm-conf# {"java.security.auth.login.config"
                                                "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
                             "localhost"
-                            ~invocations-port)
+                            invocations-port#)
              ~charlie-invok (DRPCInvocationsClient.
-                             (merge ~storm-conf {"java.security.auth.login.config"
+                             (merge storm-conf# {"java.security.auth.login.config"
                                                 "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
                              "localhost"
-                             ~invocations-port)]
+                             invocations-port#)]
          (try
            ~@body
            (finally

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
deleted file mode 100644
index ef9f883..0000000
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ /dev/null
@@ -1,737 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.supervisor-test
-  (:use [clojure test])
-  (:require [conjure.core])
-  (:use [conjure core])
-  (:require [clojure.contrib [string :as contrib-str]])
-  (:require [clojure [string :as string] [set :as set]])
-  (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout])
-  (:import [org.apache.storm.scheduler ISupervisor])
-  (:import [org.apache.storm.generated RebalanceOptions])
-  (:import [java.util UUID])
-  (:import [java.io File])
-  (:import [java.nio.file Files])
-  (:import [java.nio.file.attribute FileAttribute])
-  (:use [org.apache.storm config testing util timer])
-  (:use [org.apache.storm.daemon common])
-  (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
-            [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
-  (:use [conjure core])
-  (:require [clojure.java.io :as io]))
-
-(defn worker-assignment
-  "Return [storm-id executors]"
-  [cluster supervisor-id port]
-  (let [state (:storm-cluster-state cluster)
-        slot-assigns (for [storm-id (.assignments state nil)]
-                        (let [executors (-> (.assignment-info state storm-id nil)
-                                        :executor->node+port
-                                        reverse-map
-                                        (get [supervisor-id port] ))]
-                          (when executors [storm-id executors])
-                          ))
-        ret (find-first not-nil? slot-assigns)]
-    (when-not ret
-      (throw-runtime "Could not find assignment for worker"))
-    ret
-    ))
-
-(defn heartbeat-worker [supervisor port storm-id executors]
-  (let [conf (.get-conf supervisor)]
-    (worker/do-heartbeat {:conf conf
-                          :port port
-                          :storm-id storm-id
-                          :executors executors
-                          :worker-id (find-worker-id conf port)})))
-
-(defn heartbeat-workers [cluster supervisor-id ports]
-  (let [sup (get-supervisor cluster supervisor-id)]
-    (doseq [p ports]
-      (let [[storm-id executors] (worker-assignment cluster supervisor-id p)]
-        (heartbeat-worker sup p storm-id executors)
-        ))))
-
-(defn validate-launched-once [launched supervisor->ports storm-id]
-  (let [counts (map count (vals launched))
-        launched-supervisor->ports (apply merge-with set/union
-                                          (for [[[s p] sids] launched
-                                                :when (some #(= % storm-id) sids)]
-                                            {s #{p}}))
-        supervisor->ports (map-val set supervisor->ports)]
-    (is (every? (partial = 1) counts))
-    (is (= launched-supervisor->ports supervisor->ports))
-    ))
-
-(deftest launches-assignment
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {NIMBUS-DO-NOT-REASSIGN true
-                  SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
-                  SUPERVISOR-WORKER-TIMEOUT-SECS 15
-                  SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
-    (letlocals
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
-                       {}))
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind changed (capture-changed-workers
-                      (submit-mocked-assignment
-                        (:nimbus cluster)
-                        (:storm-cluster-state cluster)
-                        "test"
-                        {TOPOLOGY-WORKERS 3}
-                        topology
-                        {1 "1"
-                         2 "1"
-                         3 "1"
-                         4 "1"}
-                        {[1 1] ["sup1" 1]
-                         [2 2] ["sup1" 2]
-                         [3 3] ["sup1" 3]
-                         [4 4] ["sup1" 3]}
-                        {["sup1" 1] [0.0 0.0 0.0]
-                         ["sup1" 2] [0.0 0.0 0.0]
-                         ["sup1" 3] [0.0 0.0 0.0]
-                         })
-                      ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                      (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                      (advance-cluster-time cluster 2)
-                      (heartbeat-workers cluster "sup1" [1 2 3])
-                      (advance-cluster-time cluster 10)))
-      (bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
-      (is (empty? (:shutdown changed)))
-      (validate-launched-once (:launched changed) {"sup1" [1 2 3]} storm-id)
-      (bind changed (capture-changed-workers
-                        (doseq [i (range 10)]
-                          (heartbeat-workers cluster "sup1" [1 2 3])
-                          (advance-cluster-time cluster 10))
-                        ))
-      (is (empty? (:shutdown changed)))
-      (is (empty? (:launched changed)))
-      (bind changed (capture-changed-workers
-                      (heartbeat-workers cluster "sup1" [1 2])
-                      (advance-cluster-time cluster 10)
-                      ))
-      (validate-launched-once (:launched changed) {"sup1" [3]} storm-id)
-      (is (= {["sup1" 3] 1} (:shutdown changed)))
-      )))
-
-(deftest test-multiple-active-storms-multiple-supervisors
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {NIMBUS-DO-NOT-REASSIGN true
-                  SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
-                  SUPERVISOR-WORKER-TIMEOUT-SECS 15
-                  SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
-    (letlocals
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
-                       {}))
-      (bind topology2 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
-                       {}))
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2]))
-      (bind changed (capture-changed-workers
-                      (submit-mocked-assignment
-                        (:nimbus cluster)
-                        (:storm-cluster-state cluster)
-                        "test"
-                        {TOPOLOGY-WORKERS 3 TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}
-                        topology
-                        {1 "1"
-                         2 "1"
-                         3 "1"
-                         4 "1"}
-                        {[1 1] ["sup1" 1]
-                         [2 2] ["sup1" 2]
-                         [3 3] ["sup2" 1]
-                         [4 4] ["sup2" 1]}
-                        {["sup1" 1] [0.0 0.0 0.0]
-                         ["sup1" 2] [0.0 0.0 0.0]
-                         ["sup2" 1] [0.0 0.0 0.0]
-                         })
-                      ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                      (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                      (advance-cluster-time cluster 2)
-                      (heartbeat-workers cluster "sup1" [1 2])
-                      (heartbeat-workers cluster "sup2" [1])
-                      ))
-      (bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
-      (is (empty? (:shutdown changed)))
-      (validate-launched-once (:launched changed) {"sup1" [1 2] "sup2" [1]} storm-id)
-      (bind changed (capture-changed-workers
-                      (submit-mocked-assignment
-                        (:nimbus cluster)
-                        (:storm-cluster-state cluster)
-                        "test2"
-                        {TOPOLOGY-WORKERS 2}
-                        topology2
-                        {1 "1"
-                         2 "1"
-                         3 "1"}
-                        {[1 1] ["sup1" 3]
-                         [2 2] ["sup1" 3]
-                         [3 3] ["sup2" 2]}
-                        {["sup1" 3] [0.0 0.0 0.0]
-                         ["sup2" 2] [0.0 0.0 0.0]
-                         })
-                      ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                      (.rebalance (:nimbus cluster) "test2" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                      (advance-cluster-time cluster 2)
-                      (heartbeat-workers cluster "sup1" [3])
-                      (heartbeat-workers cluster "sup2" [2])
-                      ))
-      (bind storm-id2 (get-storm-id (:storm-cluster-state cluster) "test2"))
-      (is (empty? (:shutdown changed)))
-      (validate-launched-once (:launched changed) {"sup1" [3] "sup2" [2]} storm-id2)
-      (bind changed (capture-changed-workers
-        (.killTopology (:nimbus cluster) "test")
-        (doseq [i (range 4)]
-          (advance-cluster-time cluster 8)
-          (heartbeat-workers cluster "sup1" [1 2 3])
-          (heartbeat-workers cluster "sup2" [1 2])
-          )))
-      (is (empty? (:shutdown changed)))
-      (is (empty? (:launched changed)))
-      (bind changed (capture-changed-workers
-        (advance-cluster-time cluster 12)
-        ))
-      (is (empty? (:launched changed)))
-      (is (= {["sup1" 1] 1 ["sup1" 2] 1 ["sup2" 1] 1} (:shutdown changed)))
-      (bind changed (capture-changed-workers
-        (doseq [i (range 10)]
-          (heartbeat-workers cluster "sup1" [3])
-          (heartbeat-workers cluster "sup2" [2])
-          (advance-cluster-time cluster 10)
-          )))
-      (is (empty? (:shutdown changed)))
-      (is (empty? (:launched changed)))
-      ;; TODO check that downloaded code is cleaned up only for the one storm
-      )))
-
-(defn get-heartbeat [cluster supervisor-id]
-  (.supervisor-info (:storm-cluster-state cluster) supervisor-id))
-
-(defn check-heartbeat [cluster supervisor-id within-secs]
-  (let [hb (get-heartbeat cluster supervisor-id)
-        time-secs (:time-secs hb)
-        now (current-time-secs)
-        delta (- now time-secs)]
-    (is (>= delta 0))
-    (is (<= delta within-secs))
-    ))
-
-(deftest heartbeats-to-nimbus
-  (with-simulated-time-local-cluster [cluster :supervisors 0
-    :daemon-conf {SUPERVISOR-WORKER-START-TIMEOUT-SECS 15
-                  SUPERVISOR-HEARTBEAT-FREQUENCY-SECS 3}]
-    (letlocals
-      (bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
-      (advance-cluster-time cluster 4)
-      (bind hb (get-heartbeat cluster "sup"))
-      (is (= #{5 6 7} (set (:meta hb))))
-      (check-heartbeat cluster "sup" 3)
-      (advance-cluster-time cluster 3)
-      (check-heartbeat cluster "sup" 3)
-      (advance-cluster-time cluster 3)
-      (check-heartbeat cluster "sup" 3)
-      (advance-cluster-time cluster 15)
-      (check-heartbeat cluster "sup" 3)
-      (bind topology (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
-                       {}))
-      ;; prevent them from launching by capturing them
-      (capture-changed-workers
-       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
-       (advance-cluster-time cluster 3)
-       (check-heartbeat cluster "sup" 3)
-       (advance-cluster-time cluster 3)
-       (check-heartbeat cluster "sup" 3)
-       (advance-cluster-time cluster 3)
-       (check-heartbeat cluster "sup" 3)
-       (advance-cluster-time cluster 20)
-       (check-heartbeat cluster "sup" 3))
-      )))
-
-(deftest test-worker-launch-command
-  (testing "*.worker.childopts configuration"
-    (let [mock-port "42"
-          mock-storm-id "fake-storm-id"
-          mock-worker-id "fake-worker-id"
-          mock-mem-onheap 512
-          mock-cp (str file-path-separator "base" class-path-separator file-path-separator "stormjar.jar")
-          mock-sensitivity "S3"
-          mock-cp "/base:/stormjar.jar"
-          exp-args-fn (fn [opts topo-opts classpath]
-                       (concat [(supervisor/java-cmd) "-cp" classpath
-                               (str "-Dlogfile.name=" "worker.log")
-                               "-Dstorm.home="
-                               (str "-Dworkers.artifacts=" "/tmp/workers-artifacts")
-                               (str "-Dstorm.id=" mock-storm-id)
-                               (str "-Dworker.id=" mock-worker-id)
-                               (str "-Dworker.port=" mock-port)
-                               "-Dstorm.log.dir=/logs"
-                               "-Dlog4j.configurationFile=/log4j2/worker.xml"
-                               "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
-                               "org.apache.storm.LogWriter"]
-                               [(supervisor/java-cmd) "-server"]
-                               opts
-                               topo-opts
-                               ["-Djava.library.path="
-                                (str "-Dlogfile.name=" "worker.log")
-                                "-Dstorm.home="
-                                "-Dworkers.artifacts=/tmp/workers-artifacts"
-                                "-Dstorm.conf.file="
-                                "-Dstorm.options="
-                                (str "-Dstorm.log.dir=" file-path-separator "logs")
-                                (str "-Djava.io.tmpdir=" file-path-separator "storm-local" file-path-separator
-                                     "workers" file-path-separator mock-worker-id file-path-separator "tmp")
-                                (str "-Dlogging.sensitivity=" mock-sensitivity)
-                                (str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml")
-                                "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"
-                                (str "-Dstorm.id=" mock-storm-id)
-                                (str "-Dworker.id=" mock-worker-id)
-                                (str "-Dworker.port=" mock-port)
-                                "-cp" classpath
-                                "org.apache.storm.daemon.worker"
-                                mock-storm-id
-                                mock-port
-                                mock-worker-id]))]
-      (testing "testing *.worker.childopts as strings with extra spaces"
-        (let [string-opts "-Dfoo=bar  -Xmx1024m"
-              topo-string-opts "-Dkau=aux   -Xmx2048m"
-              exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
-                                    ["-Dkau=aux" "-Xmx2048m"]
-                                    mock-cp)
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                      WORKER-CHILDOPTS string-opts}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-string-opts}
-                     add-to-classpath mock-cp
-                     supervisor-stormdist-root nil
-                     launch-process nil
-                     set-worker-user! nil
-                     supervisor/jlp nil
-                     worker-artifacts-root "/tmp/workers-artifacts"
-                     supervisor/write-log-metadata! nil
-                     supervisor/create-blobstore-links nil]
-            (supervisor/launch-worker mock-supervisor
-                                      mock-storm-id
-                                      mock-port
-                                      mock-worker-id
-                                      mock-mem-onheap)
-            (verify-first-call-args-for-indices launch-process
-                                                [0]
-                                                exp-args))))
-      (testing "testing *.worker.childopts as list of strings, with spaces in values"
-        (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
-              topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
-              exp-args (exp-args-fn list-opts topo-list-opts mock-cp)
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                      WORKER-CHILDOPTS list-opts}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-list-opts}
-                     add-to-classpath mock-cp
-                     supervisor-stormdist-root nil
-                     launch-process nil
-                     set-worker-user! nil
-                     supervisor/jlp nil
-                     supervisor/write-log-metadata! nil
-                     supervisor/create-blobstore-links nil
-                     worker-artifacts-root "/tmp/workers-artifacts"]
-            (supervisor/launch-worker mock-supervisor
-                                      mock-storm-id
-                                      mock-port
-                                      mock-worker-id
-                                      mock-mem-onheap)
-            (verify-first-call-args-for-indices launch-process
-                                                [0]
-                                                exp-args))))
-      (testing "testing topology.classpath is added to classpath"
-        (let [topo-cp (str file-path-separator "any" file-path-separator "path")
-              exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
-                     supervisor-stormdist-root nil
-                     supervisor/jlp nil
-                     worker-artifacts-root "/tmp/workers-artifacts"
-                     set-worker-user! nil
-                     supervisor/write-log-metadata! nil
-                     launch-process nil
-                     current-classpath (str file-path-separator "base")
-                     supervisor/create-blobstore-links nil]
-                    (supervisor/launch-worker mock-supervisor
-                                              mock-storm-id
-                                              mock-port
-                                              mock-worker-id
-                                              mock-mem-onheap)
-                    (verify-first-call-args-for-indices launch-process
-                                                        [0]
-                                                        exp-args))))
-      (testing "testing topology.environment is added to environment for worker launch"
-        (let [topo-env {"THISVAR" "somevalue" "THATVAR" "someothervalue"}
-              full-env (merge topo-env {"LD_LIBRARY_PATH" nil})
-              exp-args (exp-args-fn [] [] mock-cp)
-              mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
-          (stubbing [read-supervisor-storm-conf {TOPOLOGY-ENVIRONMENT topo-env}
-                     supervisor-stormdist-root nil
-                     supervisor/jlp nil
-                     worker-artifacts-root "/tmp/workers-artifacts"
-                     launch-process nil
-                     set-worker-user! nil
-                     supervisor/write-log-metadata! nil
-                     current-classpath (str file-path-separator "base")
-                     supervisor/create-blobstore-links nil]
-                    (supervisor/launch-worker mock-supervisor
-                                              mock-storm-id
-                                              mock-port
-                                              mock-worker-id
-                                              mock-mem-onheap)
-                    (verify-first-call-args-for-indices launch-process
-                                                        [2]
-                                                        full-env)))))))
-
-(deftest test-worker-launch-command-run-as-user
-  (testing "*.worker.childopts configuration"
-    (let [mock-port "42"
-          mock-storm-id "fake-storm-id"
-          mock-worker-id "fake-worker-id"
-          mock-mem-onheap 512
-          mock-sensitivity "S3"
-          mock-cp "mock-classpath'quote-on-purpose"
-          attrs (make-array FileAttribute 0)
-          storm-local (.getCanonicalPath (.toFile (Files/createTempDirectory "storm-local" attrs)))
-          worker-script (str storm-local "/workers/" mock-worker-id "/storm-worker-script.sh")
-          exp-launch ["/bin/worker-launcher"
-                      "me"
-                      "worker"
-                      (str storm-local "/workers/" mock-worker-id)
-                      worker-script]
-          exp-script-fn (fn [opts topo-opts]
-                          (str "#!/bin/bash\n'export' 'LD_LIBRARY_PATH=';\n\nexec 'java'"
-                               " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
-                               " '-Dlogfile.name=" "worker.log'"
-                               " '-Dstorm.home='"
-                               " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
-                               " '-Dstorm.id=" mock-storm-id "'"
-                               " '-Dworker.id=" mock-worker-id "'"
-                               " '-Dworker.port=" mock-port "'"
-                               " '-Dstorm.log.dir=/logs'"
-                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
-                               " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
-                               " 'org.apache.storm.LogWriter'"
-                               " 'java' '-server'"
-                               " " (shell-cmd opts)
-                               " " (shell-cmd topo-opts)
-                               " '-Djava.library.path='"
-                               " '-Dlogfile.name=" "worker.log'"
-                               " '-Dstorm.home='"
-                               " '-Dworkers.artifacts=" (str storm-local "/workers-artifacts'")
-                               " '-Dstorm.conf.file='"
-                               " '-Dstorm.options='"
-                               " '-Dstorm.log.dir=/logs'"
-                               " '-Djava.io.tmpdir=" (str storm-local "/workers/" mock-worker-id "/tmp'")
-                               " '-Dlogging.sensitivity=" mock-sensitivity "'"
-                               " '-Dlog4j.configurationFile=/log4j2/worker.xml'"
-                               " '-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector'"
-                               " '-Dstorm.id=" mock-storm-id "'"
-                               " '-Dworker.id=" mock-worker-id "'"
-                               " '-Dworker.port=" mock-port "'"
-                               " '-cp' 'mock-classpath'\"'\"'quote-on-purpose'"
-                               " 'org.apache.storm.daemon.worker'"
-                               " '" mock-storm-id "'"
-                               " '" mock-port "'"
-                               " '" mock-worker-id "';"))]
-      (try
-        (testing "testing *.worker.childopts as strings with extra spaces"
-          (let [string-opts "-Dfoo=bar  -Xmx1024m"
-                topo-string-opts "-Dkau=aux   -Xmx2048m"
-                exp-script (exp-script-fn ["-Dfoo=bar" "-Xmx1024m"]
-                                          ["-Dkau=aux" "-Xmx2048m"])
-                _ (.mkdirs (io/file storm-local "workers" mock-worker-id))
-                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                        STORM-LOCAL-DIR storm-local
-                                        STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
-                                        SUPERVISOR-RUN-WORKER-AS-USER true
-                                        WORKER-CHILDOPTS string-opts}}]
-            (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-string-opts
-                                                   TOPOLOGY-SUBMITTER-USER "me"}
-                       add-to-classpath mock-cp
-                       supervisor-stormdist-root nil
-                       launch-process nil
-                       set-worker-user! nil
-                       supervisor/java-cmd "java"
-                       supervisor/jlp nil
-                       supervisor/write-log-metadata! nil]
-                      (supervisor/launch-worker mock-supervisor
-                                                mock-storm-id
-                                                mock-port
-                                                mock-worker-id
-                                                mock-mem-onheap)
-                      (verify-first-call-args-for-indices launch-process
-                                                          [0]
-                                                          exp-launch))
-            (is (= (slurp worker-script) exp-script))))
-        (finally (rmr storm-local)))
-      (.mkdirs (io/file storm-local "workers" mock-worker-id))
-      (try
-        (testing "testing *.worker.childopts as list of strings, with spaces in values"
-          (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
-                topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
-                exp-script (exp-script-fn list-opts topo-list-opts)
-                mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
-                                        STORM-LOCAL-DIR storm-local
-                                        STORM-WORKERS-ARTIFACTS-DIR (str storm-local "/workers-artifacts")
-                                        SUPERVISOR-RUN-WORKER-AS-USER true
-                                        WORKER-CHILDOPTS list-opts}}]
-            (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
-                                                   topo-list-opts
-                                                   TOPOLOGY-SUBMITTER-USER "me"}
-                       add-to-classpath mock-cp
-                       supervisor-stormdist-root nil
-                       launch-process nil
-                       set-worker-user! nil
-                       supervisor/java-cmd "java"
-                       supervisor/jlp nil
-                       supervisor/write-log-metadata! nil]
-                      (supervisor/launch-worker mock-supervisor
-                                                mock-storm-id
-                                                mock-port
-                                                mock-worker-id
-                                                mock-mem-onheap)
-                      (verify-first-call-args-for-indices launch-process
-                                                          [0]
-                                                          exp-launch))
-            (is (= (slurp worker-script) exp-script))))
-        (finally (rmr storm-local))))))
-
-(deftest test-workers-go-bananas
-  ;; test that multiple workers are started for a port, and test that
-  ;; supervisor shuts down propertly (doesn't shutdown the most
-  ;; recently launched one, checks heartbeats correctly, etc.)
-  )
-
-(deftest downloads-code
-  )
-
-(deftest test-stateless
-  )
-
-(deftest cleans-up-on-unassign
-  ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
-  )
-
-(deftest test-supervisor-data-acls
-  (testing "supervisor-data uses correct ACLs"
-    (let [scheme "digest"
-          digest "storm:thisisapoorpassword"
-          auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
-                     STORM-ZOOKEEPER-AUTH-PAYLOAD digest}
-          expected-acls supervisor/SUPERVISOR-ZK-ACLS
-          fake-isupervisor (reify ISupervisor
-                             (getSupervisorId [this] nil)
-                             (getAssignmentId [this] nil))]
-      (stubbing [uptime-computer nil
-                 cluster/mk-storm-cluster-state nil
-                 supervisor-state nil
-                 local-hostname nil
-                 mk-timer nil
-                 supervisor-local-dir nil]
-        (supervisor/supervisor-data auth-conf nil fake-isupervisor)
-        (verify-call-times-for cluster/mk-storm-cluster-state 1)
-        (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
-                                            expected-acls)))))
-
-(deftest test-write-log-metadata
-  (testing "supervisor writes correct data to logs metadata file"
-    (let [exp-owner "alice"
-          exp-worker-id "42"
-          exp-storm-id "0123456789"
-          exp-port 4242
-          exp-logs-users ["bob" "charlie" "daryl"]
-          exp-logs-groups ["read-only-group" "special-group"]
-          storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
-                      TOPOLOGY-USERS ["charlie" "bob"]
-                      TOPOLOGY-GROUPS ["special-group"]
-                      LOGS-GROUPS ["read-only-group"]
-                      LOGS-USERS ["daryl"]}
-          exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
-                    "worker-id" exp-worker-id
-                    LOGS-USERS exp-logs-users
-                    LOGS-GROUPS exp-logs-groups}
-          conf {}]
-      (mocking [supervisor/write-log-metadata-to-yaml-file!]
-        (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
-                                        exp-storm-id exp-port conf)
-        (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
-                                      exp-storm-id exp-port exp-data conf)))))
-
-(deftest test-worker-launcher-requires-user
-  (testing "worker-launcher throws on blank user"
-    (mocking [launch-process]
-      (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
-                                  #"(?i).*user cannot be blank.*"
-                                  (supervisor/worker-launcher {} nil ""))))))
-
-(defn found? [sub-str input-str]
-  (if (string? input-str)
-    (contrib-str/substring? sub-str (str input-str))
-    (boolean (some #(contrib-str/substring? sub-str %) input-str))))
-
-(defn not-found? [sub-str input-str]
-    (complement (found? sub-str input-str)))
-
-(deftest test-substitute-childopts-happy-path-string
-  (testing "worker-launcher replaces ids in childopts"
-    (let [worker-id "w-01"
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
-          expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-substitute-childopts-happy-path-list
-  (testing "worker-launcher replaces ids in childopts"
-    (let [worker-id "w-01"
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
-          expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-substitute-childopts-happy-path-list-arraylist
-  (testing "worker-launcher replaces ids in childopts"
-    (let [worker-id "w-01"
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
-          expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-substitute-childopts-topology-id-alone
-  (testing "worker-launcher replaces ids in childopts"
-    (let [worker-id "w-01"
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
-          expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-substitute-childopts-no-keys
-  (testing "worker-launcher has no ids to replace in childopts"
-    (let [worker-id "w-01"
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
-          expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-substitute-childopts-nil-childopts
-  (testing "worker-launcher has nil childopts"
-    (let [worker-id "w-01"
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts nil
-          expected-childopts nil
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-substitute-childopts-nil-ids
-  (testing "worker-launcher has nil ids"
-    (let [worker-id nil
-          topology-id "s-01"
-          port 9999
-          mem-onheap 512
-          childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-          expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-          childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-      (is (= expected-childopts childopts-with-ids)))))
-
-(deftest test-retry-read-assignments
-  (with-simulated-time-local-cluster [cluster
-                                      :supervisors 0
-                                      :ports-per-supervisor 2
-                                      :daemon-conf {NIMBUS-DO-NOT-REASSIGN true
-                                                    NIMBUS-MONITOR-FREQ-SECS 10
-                                                    TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                    TOPOLOGY-ACKER-EXECUTORS 0}]
-    (letlocals
-     (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-     (bind topology1 (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                      {}))
-     (bind topology2 (thrift/mk-topology
-                      {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                      {}))
-     (bind state (:storm-cluster-state cluster))
-     (bind changed (capture-changed-workers
-                    (submit-mocked-assignment
-                     (:nimbus cluster)
-                     (:storm-cluster-state cluster)
-                     "topology1"
-                     {TOPOLOGY-WORKERS 2}
-                     topology1
-                     {1 "1"
-                      2 "1"}
-                     {[1 1] ["sup1" 1]
-                      [2 2] ["sup1" 2]}
-                     {["sup1" 1] [0.0 0.0 0.0]
-                      ["sup1" 2] [0.0 0.0 0.0]
-                      })
-                    (submit-mocked-assignment
-                     (:nimbus cluster)
-                     (:storm-cluster-state cluster)
-                     "topology2"
-                     {TOPOLOGY-WORKERS 2}
-                     topology2
-                     {1 "1"
-                      2 "1"}
-                     {[1 1] ["sup1" 1]
-                      [2 2] ["sup1" 2]}
-                     {["sup1" 1] [0.0 0.0 0.0]
-                      ["sup1" 2] [0.0 0.0 0.0]
-                      })
-                    ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                    (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                    ))
-     (is (empty? (:launched changed)))
-     (bind options (RebalanceOptions.))
-     (.set_wait_secs options 0)
-     (bind changed (capture-changed-workers
-                    (.rebalance (:nimbus cluster) "topology2" options)
-                    (advance-cluster-time cluster 10)
-                    (heartbeat-workers cluster "sup1" [1 2 3 4])
-                    (advance-cluster-time cluster 10)
-                    ))
-     (validate-launched-once (:launched changed)
-                             {"sup1" [1 2]}
-                             (get-storm-id (:storm-cluster-state cluster) "topology1"))
-     (validate-launched-once (:launched changed)
-                             {"sup1" [3 4]}
-                             (get-storm-id (:storm-cluster-state cluster) "topology2"))
-     )))

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
new file mode 100644
index 0000000..3e41125
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+public class BasicContainerTest {
+    public static class CommandRun {
+        final List<String> cmd;
+        final Map<String, String> env;
+        final File pwd;
+        
+        public CommandRun(List<String> cmd, Map<String, String> env, File pwd) {
+            this.cmd = cmd;
+            this.env = env;
+            this.pwd = pwd;
+        }
+    }
+    
+    public static class MockBasicContainer extends BasicContainer {
+        public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+                LocalAssignment assignment, LocalState localState,
+                String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd)
+                throws IOException {
+            super(type, conf, supervisorId, port, assignment, localState, workerId, topoConf, ops,
+                    profileCmd);
+        }
+
+        public final List<CommandRun> profileCmds = new ArrayList<>();
+        public final List<CommandRun> workerCmds = new ArrayList<>();
+        
+
+        @Override
+        protected Map<String, Object> readTopoConf() throws IOException {
+            return new HashMap<>();
+        }
+        
+        @Override
+        public void createNewWorkerId() {
+            super.createNewWorkerId();
+        }
+        
+        @Override
+        public List<String> substituteChildopts(Object value, int memOnheap) {
+            return super.substituteChildopts(value, memOnheap);
+        }
+               
+        @Override
+        protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
+                File targetDir) throws IOException, InterruptedException {
+            profileCmds.add(new CommandRun(command, env, targetDir));
+            return true;
+        }
+        
+        @Override
+        protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
+                ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+            workerCmds.add(new CommandRun(command, env, targetDir));
+        }
+        
+        @Override
+        protected String javaCmd(String cmd) {
+            //avoid system dependent things
+            return cmd;
+        }
+        
+        @Override
+        protected List<String> frameworkClasspath() {
+            //We are not really running anything so make this
+            // simple to check for
+            return Arrays.asList("FRAMEWORK_CP");
+        }
+        
+        @Override
+        protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) {
+            return "JLP";
+        }
+    }
+    
+    @Test
+    public void testCreateNewWorkerId() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        LocalState ls = mock(LocalState.class);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", port, la, ls, null, new HashMap<String, Object>(), ops, "profile");
+        //null worker id means generate one...
+        
+        assertNotNull(mc._workerId);
+        verify(ls).getApprovedWorkers();
+        Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
+        expectedNewState.put(mc._workerId, port);
+        verify(ls).setApprovedWorkers(expectedNewState);
+    }
+    
+    @Test
+    public void testRecovery() throws Exception {
+        final String topoId = "test_topology";
+        final String workerId = "myWorker";
+        final int port = 8080;
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Integer> workerState = new HashMap<String, Integer>();
+        workerState.put(workerId, port);
+        
+        LocalState ls = mock(LocalState.class);
+        when(ls.getApprovedWorkers()).thenReturn(workerState);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf, 
+                "SUPERVISOR", port, la, ls, null, new HashMap<String, Object>(), ops, "profile");
+        
+        assertEquals(workerId, mc._workerId);
+    }
+    
+    @Test
+    public void testRecoveryMiss() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Integer> workerState = new HashMap<String, Integer>();
+        workerState.put("somethingelse", port+1);
+        
+        LocalState ls = mock(LocalState.class);
+        when(ls.getApprovedWorkers()).thenReturn(workerState);
+        
+        try {
+            new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(), 
+                    "SUPERVISOR", port, la, ls, null, new HashMap<String, Object>(), null, "profile");
+            fail("Container recovered worker incorrectly");
+        } catch (ContainerRecoveryException e) {
+            //Expected
+        }
+    }
+    
+    @Test
+    public void testCleanUp() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        final String workerId = "worker-id";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        Map<String, Integer> workerState = new HashMap<String, Integer>();
+        workerState.put(workerId, port);
+        
+        LocalState ls = mock(LocalState.class);
+        when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", port, la, ls, workerId, new HashMap<String, Object>(), ops, "profile");
+        
+        mc.cleanUp();
+        
+        assertNull(mc._workerId);
+        verify(ls).getApprovedWorkers();
+        Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
+        verify(ls).setApprovedWorkers(expectedNewState);
+    }
+    
+    @Test
+    public void testRunProfiling() throws Exception {
+        final long pid = 100;
+        final String topoId = "test_topology";
+        final int port = 8080;
+        final String workerId = "worker-id";
+        final String stormLocal = ContainerTest.asAbsPath("tmp", "testing");
+        final String topoRoot = ContainerTest.asAbsPath(stormLocal, topoId, String.valueOf(port));
+        final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, "worker.pid");
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid));
+        
+        LocalState ls = mock(LocalState.class);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", port, la, ls, workerId, new HashMap<String, Object>(), ops, "profile");
+        
+        //HEAP DUMP
+        ProfileRequest req = new ProfileRequest();
+        req.set_action(ProfileAction.JMAP_DUMP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        CommandRun cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JSTACK DUMP
+        req.set_action(ProfileAction.JSTACK_DUMP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //RESTART
+        req.set_action(ProfileAction.JVM_RESTART);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JPROFILE DUMP
+        req.set_action(ProfileAction.JPROFILE_DUMP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JPROFILE START
+        req.set_action(ProfileAction.JPROFILE_STOP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JPROFILE STOP
+        req.set_action(ProfileAction.JPROFILE_STOP);
+        
+        mc.runProfiling(req, true);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+    }
+    
+    private static void setSystemProp(String key, String value) {
+        if (value == null) {
+            System.clearProperty(key);
+        } else {
+            System.setProperty(key, value);
+        }
+    }
+    
+    private static interface Run {
+        public void run() throws Exception;
+    }
+    
+    private static void checkpoint(Run r, String ... newValues) throws Exception {
+        if (newValues.length % 2 != 0) {
+            throw new IllegalArgumentException("Parameters are of the form system property name, new value");
+        }
+        Map<String, String> orig = new HashMap<>();
+        try {
+            for (int index = 0; index < newValues.length; index += 2) {
+                String key = newValues[index];
+                String value = newValues[index + 1];
+                orig.put(key, System.getProperty(key));
+                setSystemProp(key, value);
+            }
+            r.run();
+        } finally {
+            for (Map.Entry<String, String> entry: orig.entrySet()) {
+                setSystemProp(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+    
+    private static <T> void assertListEquals(List<T> a, List<T> b) {
+        if (a == null) {
+            assertNull(b);
+        }
+        if (b == null) {
+            assertNull(a);
+        }
+        int commonLen = Math.min(a.size(), b.size());
+        for (int i = 0; i < commonLen; i++) {
+            assertEquals("at index "+i+"\n"+a+" !=\n"+b+"\n", a.get(i), b.get(i));
+        }
+        
+        assertEquals("size of lists don't match \n"+a+" !=\n"+b, a.size(), b.size());
+    }
+    
+    @Test
+    public void testLaunch() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        final String stormHome = ContainerTest.asAbsPath("tmp", "storm-home");
+        final String stormLogDir = ContainerTest.asFile(".", "target").getCanonicalPath();
+        final String workerId = "worker-id";
+        final String stormLocal = ContainerTest.asAbsPath("tmp", "storm-local");
+        final String distRoot = ContainerTest.asAbsPath(stormLocal, "supervisor", "stormdist", topoId);
+        final File stormcode = new File(distRoot, "stormcode.ser");
+        final File stormjar = new File(distRoot, "stormjar.jar");
+        final String log4jdir = ContainerTest.asAbsPath(stormHome, "conf");
+        final String workerConf = ContainerTest.asAbsPath(log4jdir, "worker.xml");
+        final String workerRoot = ContainerTest.asAbsPath(stormLocal, "workers", workerId);
+        final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp");
+        
+        final StormTopology st = new StormTopology();
+        st.set_spouts(new HashMap<String, SpoutSpec>());
+        st.set_bolts(new HashMap<String, Bolt>());
+        st.set_state_spouts(new HashMap<String, StateSpoutSpec>());
+        byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st));
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        superConf.put(Config.STORM_LOG4J2_CONF_DIR, log4jdir);
+        superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
+        
+        final LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        final AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.slurp(stormcode)).thenReturn(serializedState);
+        
+        final LocalState ls = mock(LocalState.class);
+        
+        
+        checkpoint(new Run() {
+          @Override
+          public void run() throws Exception {
+            MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, 
+                    "SUPERVISOR", port, la, ls, workerId, new HashMap<String, Object>(), ops, "profile");
+
+            mc.launch();
+
+            assertEquals(1, mc.workerCmds.size());
+            CommandRun cmd = mc.workerCmds.get(0);
+            mc.workerCmds.clear();
+            assertListEquals(Arrays.asList(
+                    "java",
+                    "-cp",
+                    "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                    "-Dlogging.sensitivity=S3",
+                    "-Dlogfile.name=worker.log",
+                    "-Dstorm.home=" + stormHome,
+                    "-Dworkers.artifacts=" + stormLocal,
+                    "-Dstorm.id=" + topoId,
+                    "-Dworker.id=" + workerId,
+                    "-Dworker.port=" + port,
+                    "-Dstorm.log.dir=" + stormLogDir,
+                    "-Dlog4j.configurationFile=" + workerConf,
+                    "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                    "-Dstorm.local.dir=" + stormLocal,
+                    "org.apache.storm.LogWriter",
+                    "java",
+                    "-server",
+                    "-Dlogging.sensitivity=S3",
+                    "-Dlogfile.name=worker.log",
+                    "-Dstorm.home=" + stormHome,
+                    "-Dworkers.artifacts=" + stormLocal,
+                    "-Dstorm.id=" + topoId,
+                    "-Dworker.id=" + workerId,
+                    "-Dworker.port=" + port,
+                    "-Dstorm.log.dir=" + stormLogDir,
+                    "-Dlog4j.configurationFile=" + workerConf,
+                    "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                    "-Dstorm.local.dir=" + stormLocal,
+                    "-Dtesting=true",
+                    "-Djava.library.path=JLP",
+                    "-Dstorm.conf.file=",
+                    "-Dstorm.options=",
+                    "-Djava.io.tmpdir="+workerTmpDir,
+                    "-cp",
+                    "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                    "org.apache.storm.daemon.worker", 
+                    topoId, 
+                    "SUPERVISOR",
+                    String.valueOf(port),
+                    workerId
+                    ), cmd.cmd);
+            assertEquals(new File(workerRoot), cmd.pwd);
+          }}, 
+          "storm.home", stormHome,
+          "storm.log.dir", stormLogDir);
+    }
+    
+    @Test
+    public void testSubstChildOpts() throws Exception {
+        String workerId = "w-01";
+        String topoId = "s-01";
+        int port = 9999;
+        int memOnheap = 512;
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        LocalState ls = mock(LocalState.class);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", port, la, ls, workerId, new HashMap<String, Object>(), ops, "profile");
+        
+        assertListEquals(Arrays.asList(
+                "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+                "-Xms256m",
+                "-Xmx512m"),
+                mc.substituteChildopts("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m", memOnheap));
+        
+        assertListEquals(Arrays.asList(
+                "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+                "-Xms256m",
+                "-Xmx512m"),
+                mc.substituteChildopts(Arrays.asList("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log","-Xms256m","-Xmx%HEAP-MEM%m"), memOnheap));
+        
+        assertListEquals(Collections.<String>emptyList(), 
+                mc.substituteChildopts(null));
+    }
+}


Mime
View raw message