storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [07/12] git commit: Merge branch '0.9.0-windows' of github.com:davidlao2k/storm into storm-windows
Date Thu, 19 Dec 2013 06:25:19 GMT
Merge branch '0.9.0-windows' of github.com:davidlao2k/storm into storm-windows

Conflicts:
	storm-core/src/clj/backtype/storm/util.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/697849dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/697849dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/697849dc

Branch: refs/heads/master
Commit: 697849dcb4ebecd086a42e9fa415d9ed51d9c722
Parents: 7e40f9f 0e46397
Author: P. Taylor Goetz <ptgoetz@gmail.com>
Authored: Sun Dec 15 15:25:34 2013 -0500
Committer: P. Taylor Goetz <ptgoetz@gmail.com>
Committed: Sun Dec 15 15:25:34 2013 -0500

----------------------------------------------------------------------
 bin/storm-config.cmd                            |  95 ++++++++++
 bin/storm.cmd                                   | 186 +++++++++++++++++++
 storm-core/src/clj/backtype/storm/config.clj    |  46 ++---
 .../clj/backtype/storm/daemon/supervisor.clj    |  37 +++-
 storm-core/src/clj/backtype/storm/testing.clj   |   6 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   6 +-
 storm-core/src/clj/backtype/storm/util.clj      |  18 +-
 7 files changed, 356 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/config.clj
index 7db986f,0000000..7525496
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@@ -1,222 -1,0 +1,222 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http:;; www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns backtype.storm.config
 +  (:import [java.io FileReader File])
 +  (:import [backtype.storm Config ConfigValidation$FieldValidator])
 +  (:import [backtype.storm.utils Utils LocalState])
 +  (:import [org.apache.commons.io FileUtils])
 +  (:require [clojure [string :as str]])
 +  (:use [backtype.storm util])
 +  )
 +
 +(def RESOURCES-SUBDIR "resources")
 +
 +(defn- clojure-config-name [name]
 +  (.replace (.toUpperCase name) "_" "-"))
 +
 +;; define clojure constants for every configuration parameter
 +(doseq [f (seq (.getFields Config))]
 +  (let [name (.getName f)
 +        new-name (clojure-config-name name)]
 +    (eval
 +      `(def ~(symbol new-name) (. Config ~(symbol name))))
 +      ))
 +
 +(def ALL-CONFIGS
 +  (dofor [f (seq (.getFields Config))]
 +         (.get f nil)
 +         ))
 +
 +(defmulti get-FieldValidator class-selector)
 +
 +(defmethod get-FieldValidator nil [_]
 +  (throw (IllegalArgumentException. "Cannot validate a nil field.")))
 +
 +(defmethod get-FieldValidator
 +  ConfigValidation$FieldValidator [validator] validator)
 +
 +(defmethod get-FieldValidator Object [klass]
 +  {:pre [(not (nil? klass))]}
 +  (reify ConfigValidation$FieldValidator
 +    (validateField [this name v]
 +      (if (and (not (nil? v))
 +               (not (instance? klass v)))
 +        (throw (IllegalArgumentException.
 +                 (str "field " name " '" v "' must be a '" (.getName klass) "'")))))))
 +
 +;; Create a mapping of config-string -> validator
 +;; Config fields must have a _SCHEMA field defined
 +(def CONFIG-SCHEMA-MAP
 +  (->> (.getFields Config)
 +          (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
 +          (map (fn [f] [(.get f nil) (get-FieldValidator
 +                                       (-> Config
 +                                         (.getField (str (.getName f) "_SCHEMA"))
 +                                         (.get nil)))]))
 +          (into {})))
 +
 +(defn cluster-mode [conf & args]
 +  (keyword (conf STORM-CLUSTER-MODE)))
 +
 +(defn local-mode? [conf]
 +  (let [mode (conf STORM-CLUSTER-MODE)]
 +    (condp = mode
 +      "local" true
 +      "distributed" false
 +      (throw (IllegalArgumentException.
 +                (str "Illegal cluster mode in conf: " mode)))
 +      )))
 +
 +(defn sampling-rate [conf]
 +  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
 +       (/ 1)
 +       int))
 +
 +(defn mk-stats-sampler [conf]
 +  (even-sampler (sampling-rate conf)))
 +
 +; storm.zookeeper.servers:
 +;     - "server1"
 +;     - "server2"
 +;     - "server3"
 +; nimbus.host: "master"
 +; 
 +; ########### These all have default values as shown
 +; 
 +; ### storm.* configs are general configurations
 +; # the local dir is where jars are kept
 +; storm.local.dir: "/mnt/storm"
 +; storm.zookeeper.port: 2181
 +; storm.zookeeper.root: "/storm"
 +
 +(defn read-default-config []
 +  (clojurify-structure (Utils/readDefaultConfig)))
 +
 +(defn- validate-configs-with-schemas [conf]
 +  (doseq [[k v] conf
 +         :let [schema (CONFIG-SCHEMA-MAP k)]]
 +    (if (not (nil? schema))
 +      (.validateField schema k v))))
 +
 +(defn read-storm-config []
 +  (let [
 +        conf (clojurify-structure (Utils/readStormConfig))]
 +    (validate-configs-with-schemas conf)
 +    conf))
 +
 +(defn read-yaml-config [name]
 +  (let [conf (clojurify-structure (Utils/findAndReadConfigFile name true))]
 +    (validate-configs-with-schemas conf)
 +    conf))
 +
 +(defn master-local-dir [conf]
-   (let [ret (str (conf STORM-LOCAL-DIR) "/nimbus")]
++  (let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "nimbus")]
 +    (FileUtils/forceMkdir (File. ret))
 +    ret
 +    ))
 +
 +(defn master-stormdist-root
 +  ([conf]
-      (str (master-local-dir conf) "/stormdist"))
++     (str (master-local-dir conf) file-path-separator "stormdist"))
 +  ([conf storm-id]
-      (str (master-stormdist-root conf) "/" storm-id)))
++     (str (master-stormdist-root conf) file-path-separator storm-id)))
 +
 +(defn master-stormjar-path [stormroot]
-   (str stormroot "/stormjar.jar"))
++  (str stormroot file-path-separator "stormjar.jar"))
 +
 +(defn master-stormcode-path [stormroot]
-   (str stormroot "/stormcode.ser"))
++  (str stormroot file-path-separator "stormcode.ser"))
 +
 +(defn master-stormconf-path [stormroot]
-   (str stormroot "/stormconf.ser"))
++  (str stormroot file-path-separator "stormconf.ser"))
 +
 +(defn master-inbox [conf]
-   (let [ret (str (master-local-dir conf) "/inbox")]
++  (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
 +    (FileUtils/forceMkdir (File. ret))
 +    ret ))
 +
 +(defn master-inimbus-dir [conf]
-   (str (master-local-dir conf) "/inimbus"))
++  (str (master-local-dir conf) file-path-separator "inimbus"))
 +
 +(defn supervisor-local-dir [conf]
-   (let [ret (str (conf STORM-LOCAL-DIR) "/supervisor")]
++  (let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "supervisor")]
 +    (FileUtils/forceMkdir (File. ret))
 +    ret
 +    ))
 +
 +(defn supervisor-isupervisor-dir [conf]
-   (str (supervisor-local-dir conf) "/isupervisor"))
++  (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
 +
 +(defn supervisor-stormdist-root
-   ([conf] (str (supervisor-local-dir conf) "/stormdist"))
++  ([conf] (str (supervisor-local-dir conf) file-path-separator "stormdist"))
 +  ([conf storm-id]
-       (str (supervisor-stormdist-root conf) "/" (java.net.URLEncoder/encode storm-id))))
++      (str (supervisor-stormdist-root conf) file-path-separator (java.net.URLEncoder/encode storm-id))))
 +
 +(defn supervisor-stormjar-path [stormroot]
-   (str stormroot "/stormjar.jar"))
++  (str stormroot file-path-separator "stormjar.jar"))
 +
 +(defn supervisor-stormcode-path [stormroot]
-   (str stormroot "/stormcode.ser"))
++  (str stormroot file-path-separator "stormcode.ser"))
 +
 +(defn supervisor-stormconf-path [stormroot]
-   (str stormroot "/stormconf.ser"))
++  (str stormroot file-path-separator "stormconf.ser"))
 +
 +(defn supervisor-tmp-dir [conf]
-   (let [ret (str (supervisor-local-dir conf) "/tmp")]
++  (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
 +    (FileUtils/forceMkdir (File. ret))
 +    ret ))
 +
 +(defn supervisor-storm-resources-path [stormroot]
-   (str stormroot "/" RESOURCES-SUBDIR))
++  (str stormroot file-path-separator RESOURCES-SUBDIR))
 +
 +(defn ^LocalState supervisor-state [conf]
-   (LocalState. (str (supervisor-local-dir conf) "/localstate")))
++  (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate")))
 +
 +(defn read-supervisor-storm-conf [conf storm-id]
 +  (let [stormroot (supervisor-stormdist-root conf storm-id)
 +        conf-path (supervisor-stormconf-path stormroot)
 +        topology-path (supervisor-stormcode-path stormroot)]
 +    (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
 +    ))
 +
 +(defn read-supervisor-topology [conf storm-id]
 +  (let [stormroot (supervisor-stormdist-root conf storm-id)
 +        topology-path (supervisor-stormcode-path stormroot)]
 +    (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
 +    ))
 +
 +(defn worker-root
 +  ([conf]
-      (str (conf STORM-LOCAL-DIR) "/workers"))
++     (str (conf STORM-LOCAL-DIR) file-path-separator "workers"))
 +  ([conf id]
-      (str (worker-root conf) "/" id)))
++     (str (worker-root conf) file-path-separator id)))
 +
 +(defn worker-pids-root
 +  [conf id]
-   (str (worker-root conf id) "/pids"))
++  (str (worker-root conf id) file-path-separator "pids"))
 +
 +(defn worker-pid-path [conf id pid]
-   (str (worker-pids-root conf id) "/" pid))
++  (str (worker-pids-root conf id) file-path-separator pid))
 +
 +(defn worker-heartbeats-root
 +  [conf id]
-   (str (worker-root conf id) "/heartbeats"))
++  (str (worker-root conf id) file-path-separator "heartbeats"))
 +
 +;; workers heartbeat here with pid and timestamp
 +;; if supervisor stops receiving heartbeat, it kills and restarts the process
 +;; in local mode, keep a global map of ids to threads for simulating process management
 +(defn ^LocalState worker-state  [conf id]
 +  (LocalState. (worker-heartbeats-root conf id)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index af47c26,0000000..0d9cc18
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -1,520 -1,0 +1,543 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http:;; www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns backtype.storm.daemon.supervisor
 +  (:import [backtype.storm.scheduler ISupervisor])
 +  (:use [backtype.storm bootstrap])
 +  (:use [backtype.storm.daemon common])
 +  (:require [backtype.storm.daemon [worker :as worker]])
 +  (:gen-class
 +    :methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))
 +
 +(bootstrap)
 +
 +(defmulti download-storm-code cluster-mode)
 +(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
 +
 +;; used as part of a map from port to this
 +(defrecord LocalAssignment [storm-id executors])
 +
 +(defprotocol SupervisorDaemon
 +  (get-id [this])
 +  (get-conf [this])
 +  (shutdown-all-workers [this])
 +  )
 +
 +(defn- assignments-snapshot [storm-cluster-state callback]
 +  (let [storm-ids (.assignments storm-cluster-state callback)]
 +     (->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid callback)})
 +          (apply merge)
 +          (filter-val not-nil?)
 +          )))
 +
 +(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
 +  (let [assignment (get assignments-snapshot storm-id)
 +        my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
 +                           (:executor->node+port assignment))
 +        port-executors (apply merge-with
 +                          concat
 +                          (for [[executor [_ port]] my-executors]
 +                            {port [executor]}
 +                            ))]
 +    (into {} (for [[port executors] port-executors]
 +               ;; need to cast to int b/c it might be a long (due to how yaml parses things)
 +               ;; doall is to avoid serialization/deserialization problems with lazy seqs
 +               [(Integer. port) (LocalAssignment. storm-id (doall executors))]
 +               ))))
 +
 +
 +(defn- read-assignments
 +  "Returns map from port to struct containing :storm-id and :executors"
 +  [assignments-snapshot assignment-id]
 +  (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
 +       (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
 +
 +(defn- read-storm-code-locations
 +  [assignments-snapshot]
 +  (map-val :master-code-dir assignments-snapshot))
 +
 +(defn- read-downloaded-storm-ids [conf]
 +  (map #(java.net.URLDecoder/decode %) (read-dir-contents (supervisor-stormdist-root conf)))
 +  )
 +
 +(defn read-worker-heartbeat [conf id]
 +  (let [local-state (worker-state conf id)]
 +    (.get local-state LS-WORKER-HEARTBEAT)
 +    ))
 +
 +
 +(defn my-worker-ids [conf]
 +  (read-dir-contents (worker-root conf)))
 +
 +(defn read-worker-heartbeats
 +  "Returns map from worker id to heartbeat"
 +  [conf]
 +  (let [ids (my-worker-ids conf)]
 +    (into {}
 +      (dofor [id ids]
 +        [id (read-worker-heartbeat conf id)]))
 +    ))
 +
 +
 +(defn matches-an-assignment? [worker-heartbeat assigned-executors]
 +  (let [local-assignment (assigned-executors (:port worker-heartbeat))]
 +    (and local-assignment
 +         (= (:storm-id worker-heartbeat) (:storm-id local-assignment))
 +         (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
 +            (set (:executors local-assignment))))))
 +
 +(defn read-allocated-workers
 +  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
 +  [supervisor assigned-executors now]
 +  (let [conf (:conf supervisor)
 +        ^LocalState local-state (:local-state supervisor)
 +        id->heartbeat (read-worker-heartbeats conf)
 +        approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))]
 +    (into
 +     {}
 +     (dofor [[id hb] id->heartbeat]
 +            (let [state (cond
++                         (not hb)
++                           :not-started
 +                         (or (not (contains? approved-ids id))
 +                             (not (matches-an-assignment? hb assigned-executors)))
 +                           :disallowed
-                          (not hb)
-                            :not-started
 +                         (> (- now (:time-secs hb))
 +                            (conf SUPERVISOR-WORKER-TIMEOUT-SECS))
 +                           :timed-out
 +                         true
 +                           :valid)]
 +              (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
 +              [id [state hb]]
 +              ))
 +     )))
 +
 +(defn- wait-for-worker-launch [conf id start-time]
 +  (let [state (worker-state conf id)]    
 +    (loop []
 +      (let [hb (.get state LS-WORKER-HEARTBEAT)]
 +        (when (and
 +               (not hb)
 +               (<
 +                (- (current-time-secs) start-time)
 +                (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
 +                ))
 +          (log-message id " still hasn't started")
 +          (Time/sleep 500)
 +          (recur)
 +          )))
 +    (when-not (.get state LS-WORKER-HEARTBEAT)
 +      (log-message "Worker " id " failed to start")
 +      )))
 +
 +(defn- wait-for-workers-launch [conf ids]
 +  (let [start-time (current-time-secs)]
 +    (doseq [id ids]
 +      (wait-for-worker-launch conf id start-time))
 +    ))
 +
 +(defn generate-supervisor-id []
 +  (uuid))
 +
 +(defn try-cleanup-worker [conf id]
 +  (try
 +    (rmr (worker-heartbeats-root conf id))
 +    ;; this avoids a race condition with worker or subprocess writing pid around same time
 +    (rmpath (worker-pids-root conf id))
 +    (rmpath (worker-root conf id))
 +  (catch RuntimeException e
 +    (log-warn-error e "Failed to cleanup worker " id ". Will retry later")
-     )))
++    )
++  (catch java.io.FileNotFoundException e (log-message (.getMessage e)))
++  (catch java.io.IOException e (log-message (.getMessage e)))
++    ))
 +
 +(defn shutdown-worker [supervisor id]
 +  (log-message "Shutting down " (:supervisor-id supervisor) ":" id)
 +  (let [conf (:conf supervisor)
 +        pids (read-dir-contents (worker-pids-root conf id))
 +        thread-pid (@(:worker-thread-pids-atom supervisor) id)]
 +    (when thread-pid
 +      (psim/kill-process thread-pid))
 +    (doseq [pid pids]
 +      (ensure-process-killed! pid)
-       (rmpath (worker-pid-path conf id pid))
++      (try
++        (rmpath (worker-pid-path conf id pid))
++        (catch Exception e)) ;; on windows, the supervisor may still holds the lock on the worker directory
 +      )
 +    (try-cleanup-worker conf id))
 +  (log-message "Shut down " (:supervisor-id supervisor) ":" id))
 +
 +(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
 +  {:conf conf
 +   :shared-context shared-context
 +   :isupervisor isupervisor
 +   :active (atom true)
 +   :uptime (uptime-computer)
 +   :worker-thread-pids-atom (atom {})
 +   :storm-cluster-state (cluster/mk-storm-cluster-state conf)
 +   :local-state (supervisor-state conf)
 +   :supervisor-id (.getSupervisorId isupervisor)
 +   :assignment-id (.getAssignmentId isupervisor)
 +   :my-hostname (if (contains? conf STORM-LOCAL-HOSTNAME)
 +                  (conf STORM-LOCAL-HOSTNAME)
 +                  (local-hostname))
 +   :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
 +   :timer (mk-timer :kill-fn (fn [t]
 +                               (log-error t "Error when processing event")
 +                               (halt-process! 20 "Error when processing an event")
 +                               ))
 +   })
 +
 +(defn sync-processes [supervisor]
 +  (let [conf (:conf supervisor)
 +        ^LocalState local-state (:local-state supervisor)
 +        assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
 +        now (current-time-secs)
 +        allocated (read-allocated-workers supervisor assigned-executors now)
 +        keepers (filter-val
 +                 (fn [[state _]] (= state :valid))
 +                 allocated)
 +        keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
 +        reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
 +        new-worker-ids (into
 +                        {}
 +                        (for [port (keys reassign-executors)]
 +                          [port (uuid)]))
 +        ]
 +    ;; 1. to kill are those in allocated that are dead or disallowed
 +    ;; 2. kill the ones that should be dead
 +    ;;     - read pids, kill -9 and individually remove file
 +    ;;     - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
 +    ;; 3. of the rest, figure out what assignments aren't yet satisfied
 +    ;; 4. generate new worker ids, write new "approved workers" to LS
 +    ;; 5. create local dir for worker id
 +    ;; 5. launch new workers (give worker-id, port, and supervisor-id)
 +    ;; 6. wait for workers launch
 +  
 +    (log-debug "Syncing processes")
 +    (log-debug "Assigned executors: " assigned-executors)
 +    (log-debug "Allocated: " allocated)
 +    (doseq [[id [state heartbeat]] allocated]
 +      (when (not= :valid state)
 +        (log-message
 +         "Shutting down and clearing state for id " id
 +         ". Current supervisor time: " now
 +         ". State: " state
 +         ", Heartbeat: " (pr-str heartbeat))
 +        (shutdown-worker supervisor id)
 +        ))
 +    (doseq [id (vals new-worker-ids)]
 +      (local-mkdirs (worker-pids-root conf id)))
 +    (.put local-state LS-APPROVED-WORKERS
 +          (merge
 +           (select-keys (.get local-state LS-APPROVED-WORKERS)
 +                        (keys keepers))
 +           (zipmap (vals new-worker-ids) (keys new-worker-ids))
 +           ))
 +    (wait-for-workers-launch
 +     conf
 +     (dofor [[port assignment] reassign-executors]
 +       (let [id (new-worker-ids port)]
 +         (log-message "Launching worker with assignment "
 +                      (pr-str assignment)
 +                      " for this supervisor "
 +                      (:supervisor-id supervisor)
 +                      " on port "
 +                      port
 +                      " with id "
 +                      id
 +                      )
 +         (launch-worker supervisor
 +                        (:storm-id assignment)
 +                        port
 +                        id)
 +         id)))
 +    ))
 +
 +(defn assigned-storm-ids-from-port-assignments [assignment]
 +  (->> assignment
 +       vals
 +       (map :storm-id)
 +       set))
 +
++(defn shutdown-disallowed-workers [supervisor]
++  (let [conf (:conf supervisor)
++        ^LocalState local-state (:local-state supervisor)
++        assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
++        now (current-time-secs)
++        allocated (read-allocated-workers supervisor assigned-executors now)
++        disallowed (keys (filter-val
++                                  (fn [[state _]] (= state :disallowed))
++                                  allocated))]
++    (log-debug "Allocated workers " allocated)
++    (log-debug "Disallowed workers " disallowed)
++    (doseq [id disallowed]
++      (shutdown-worker supervisor id))
++    ))
++
 +(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
 +  (fn this []
 +    (let [conf (:conf supervisor)
 +          storm-cluster-state (:storm-cluster-state supervisor)
 +          ^ISupervisor isupervisor (:isupervisor supervisor)
 +          ^LocalState local-state (:local-state supervisor)
 +          sync-callback (fn [& ignored] (.add event-manager this))
 +          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
 +          storm-code-map (read-storm-code-locations assignments-snapshot)
 +          downloaded-storm-ids (set (read-downloaded-storm-ids conf))
 +          all-assignment (read-assignments
 +                           assignments-snapshot
 +                           (:assignment-id supervisor))
 +          new-assignment (->> all-assignment
 +                              (filter-key #(.confirmAssigned isupervisor %)))
 +          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
 +          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
 +      (log-debug "Synchronizing supervisor")
 +      (log-debug "Storm code map: " storm-code-map)
 +      (log-debug "Downloaded storm ids: " downloaded-storm-ids)
 +      (log-debug "All assignment: " all-assignment)
 +      (log-debug "New assignment: " new-assignment)
 +      
 +      ;; download code first
 +      ;; This might take awhile
 +      ;;   - should this be done separately from usual monitoring?
 +      ;; should we only download when topology is assigned to this supervisor?
 +      (doseq [[storm-id master-code-dir] storm-code-map]
 +        (when (and (not (downloaded-storm-ids storm-id))
 +                   (assigned-storm-ids storm-id))
 +          (log-message "Downloading code for storm id "
 +             storm-id
 +             " from "
 +             master-code-dir)
 +          (download-storm-code conf storm-id master-code-dir)
 +          (log-message "Finished downloading code for storm id "
 +             storm-id
 +             " from "
 +             master-code-dir)
 +          ))
 +
 +      (log-debug "Writing new assignment "
 +                 (pr-str new-assignment))
 +      (doseq [p (set/difference (set (keys existing-assignment))
 +                                (set (keys new-assignment)))]
 +        (.killedWorker isupervisor (int p)))
 +      (.assigned isupervisor (keys new-assignment))
 +      (.put local-state
 +            LS-LOCAL-ASSIGNMENTS
 +            new-assignment)
 +      (reset! (:curr-assignment supervisor) new-assignment)
 +      ;; remove any downloaded code that's no longer assigned or active
 +      ;; important that this happens after setting the local assignment so that
 +      ;; synchronize-supervisor doesn't try to launch workers for which the
 +      ;; resources don't exist
++      (if on-windows? (shutdown-disallowed-workers supervisor))
 +      (doseq [storm-id downloaded-storm-ids]
 +        (when-not (assigned-storm-ids storm-id)
 +          (log-message "Removing code for storm id "
 +                       storm-id)
-           (rmr (supervisor-stormdist-root conf storm-id))
++          (try
++            (rmr (supervisor-stormdist-root conf storm-id))
++            (catch Exception e (log-message (.getMessage e))))
 +          ))
 +      (.add processes-event-manager sync-processes)
 +      )))
 +
 +;; in local state, supervisor stores who its current assignments are
 +;; another thread launches events to restart any dead processes if necessary
 +(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
 +  (log-message "Starting Supervisor with conf " conf)
 +  (.prepare isupervisor conf (supervisor-isupervisor-dir conf))
 +  (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
 +  (let [supervisor (supervisor-data conf shared-context isupervisor)
 +        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]                         
 +        sync-processes (partial sync-processes supervisor)
 +        synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
 +        heartbeat-fn (fn [] (.supervisor-heartbeat!
 +                               (:storm-cluster-state supervisor)
 +                               (:supervisor-id supervisor)
 +                               (SupervisorInfo. (current-time-secs)
 +                                                (:my-hostname supervisor)
 +                                                (:assignment-id supervisor)
 +                                                (keys @(:curr-assignment supervisor))
 +                                                ;; used ports
 +                                                (.getMetadata isupervisor)
 +                                                (conf SUPERVISOR-SCHEDULER-META)
 +                                                ((:uptime supervisor)))))]
 +    (heartbeat-fn)
 +    ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
 +    (schedule-recurring (:timer supervisor)
 +                        0
 +                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
 +                        heartbeat-fn)
 +    (when (conf SUPERVISOR-ENABLE)
 +      ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
 +      ;; to date even if callbacks don't all work exactly right
 +      (schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
 +      (schedule-recurring (:timer supervisor)
 +                          0
 +                          (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
 +                          (fn [] (.add processes-event-manager sync-processes))))
 +    (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
 +    (reify
 +     Shutdownable
 +     (shutdown [this]
 +               (log-message "Shutting down supervisor " (:supervisor-id supervisor))
 +               (reset! (:active supervisor) false)
 +               (cancel-timer (:timer supervisor))
 +               (.shutdown event-manager)
 +               (.shutdown processes-event-manager)
 +               (.disconnect (:storm-cluster-state supervisor)))
 +     SupervisorDaemon
 +     (get-conf [this]
 +       conf)
 +     (get-id [this]
 +       (:supervisor-id supervisor))
 +     (shutdown-all-workers [this]
 +       (let [ids (my-worker-ids conf)]
 +         (doseq [id ids]
 +           (shutdown-worker supervisor id)
 +           )))
 +     DaemonCommon
 +     (waiting? [this]
 +       (or (not @(:active supervisor))
 +           (and
 +            (timer-waiting? (:timer supervisor))
 +            (every? (memfn waiting?) managers)))
 +           ))))
 +
 +(defn kill-supervisor [supervisor]
 +  (.shutdown supervisor)
 +  )
 +
 +;; distributed implementation
 +
 +(defmethod download-storm-code
 +    :distributed [conf storm-id master-code-dir]
 +    ;; Downloading to permanent location is atomic
-     (let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid))
++    (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
 +          stormroot (supervisor-stormdist-root conf storm-id)]
 +      (FileUtils/forceMkdir (File. tmproot))
 +      
 +      (Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
 +      (Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
 +      (Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
 +      (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
 +      (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
 +      ))
 +
 +
 +(defmethod launch-worker
 +    :distributed [supervisor storm-id port worker-id]
 +    (let [conf (:conf supervisor)
 +          storm-home (System/getProperty "storm.home")
 +          stormroot (supervisor-stormdist-root conf storm-id)
 +          stormjar (supervisor-stormjar-path stormroot)
 +          storm-conf (read-supervisor-storm-conf conf storm-id)
 +          classpath (add-to-classpath (current-classpath) [stormjar])
 +          childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
 +                                 "%ID%"
 +                                 (str port))
 +          logfilename (str "worker-" port ".log")
 +          command (str "java -server " childopts
 +                       " -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
 +                       " -Dlogfile.name=" logfilename
 +                       " -Dstorm.home=" storm-home
 +                       " -Dlogback.configurationFile=" storm-home "/logback/cluster.xml"
 +                       " -Dstorm.id=" storm-id
 +                       " -Dworker.id=" worker-id
 +                       " -Dworker.port=" port
 +                       " -cp " classpath " backtype.storm.daemon.worker "
 +                       (java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor)
 +                       " " port " " worker-id)]
 +      (log-message "Launching worker with command: " command)
 +      (launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)})
 +      ))
 +
 +;; local implementation
 +
 +(defn resources-jar []
 +  (->> (.split (current-classpath) File/pathSeparator)
 +       (filter #(.endsWith  % ".jar"))
 +       (filter #(zip-contains-dir? % RESOURCES-SUBDIR))
 +       first ))
 +
 +(defmethod download-storm-code
 +    :local [conf storm-id master-code-dir]
 +  (let [stormroot (supervisor-stormdist-root conf storm-id)]
 +      (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
 +      (let [classloader (.getContextClassLoader (Thread/currentThread))
 +            resources-jar (resources-jar)
 +            url (.getResource classloader RESOURCES-SUBDIR)
-             target-dir (str stormroot "/" RESOURCES-SUBDIR)]
++            target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
 +            (cond
 +              resources-jar
 +              (do
 +                (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
 +                (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
 +              url
 +              (do
 +                (log-message "Copying resources at " (str url) " to " target-dir)
 +                (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir))
 +                ))
 +            )))
 +
 +(defmethod launch-worker
 +    :local [supervisor storm-id port worker-id]
 +    (let [conf (:conf supervisor)
 +          pid (uuid)
 +          worker (worker/mk-worker conf
 +                                   (:shared-context supervisor)
 +                                   storm-id
 +                                   (:assignment-id supervisor)
 +                                   port
 +                                   worker-id)]
 +      (psim/register-process pid worker)
 +      (swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
 +      ))
 +
 +(defn -launch [supervisor]
 +  (let [conf (read-storm-config)]
 +    (validate-distributed-mode! conf)
 +    (mk-supervisor conf nil supervisor)))
 +
 +(defn standalone-supervisor []
 +  (let [conf-atom (atom nil)
 +        id-atom (atom nil)]
 +    (reify ISupervisor
 +      (prepare [this conf local-dir]
 +        (reset! conf-atom conf)
 +        (let [state (LocalState. local-dir)
 +              curr-id (if-let [id (.get state LS-ID)]
 +                        id
 +                        (generate-supervisor-id))]
 +          (.put state LS-ID curr-id)
 +          (reset! id-atom curr-id))
 +        )
 +      (confirmAssigned [this port]
 +        true)
 +      (getMetadata [this]
 +        (doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS))))
 +      (getSupervisorId [this]
 +        @id-atom)
 +      (getAssignmentId [this]
 +        @id-atom)
 +      (killedWorker [this port]
 +        )
 +      (assigned [this ports]
 +        ))))
 +
 +(defn -main []
 +  (-launch (standalone-supervisor)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/697849dc/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/testing.clj
index 94469e5,0000000..f17873f
mode 100644,000000..100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@@ -1,617 -1,0 +1,619 @@@
 +;; Licensed to the Apache Software Foundation (ASF) under one
 +;; or more contributor license agreements.  See the NOTICE file
 +;; distributed with this work for additional information
 +;; regarding copyright ownership.  The ASF licenses this file
 +;; to you under the Apache License, Version 2.0 (the
 +;; "License"); you may not use this file except in compliance
 +;; with the License.  You may obtain a copy of the License at
 +;;
 +;; http:;; www.apache.org/licenses/LICENSE-2.0
 +;;
 +;; Unless required by applicable law or agreed to in writing, software
 +;; distributed under the License is distributed on an "AS IS" BASIS,
 +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +;; See the License for the specific language governing permissions and
 +;; limitations under the License.
 +(ns backtype.storm.testing
 +  (:require [backtype.storm.daemon
 +             [nimbus :as nimbus]
 +             [supervisor :as supervisor]
 +             [common :as common]
 +             [worker :as worker]
 +             [executor :as executor]])
 +  (:require [backtype.storm [process-simulator :as psim]])
 +  (:import [org.apache.commons.io FileUtils])
 +  (:import [java.io File])
 +  (:import [java.util HashMap ArrayList])
 +  (:import [java.util.concurrent.atomic AtomicInteger])
 +  (:import [java.util.concurrent ConcurrentHashMap])
 +  (:import [backtype.storm.utils Time Utils RegisteredGlobalState])
 +  (:import [backtype.storm.tuple Fields Tuple TupleImpl])
 +  (:import [backtype.storm.task TopologyContext])
 +  (:import [backtype.storm.generated GlobalStreamId Bolt KillOptions])
 +  (:import [backtype.storm.testing FeederSpout FixedTupleSpout FixedTuple
 +            TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
 +            TestWordSpout MemoryTransactionalSpout])
 +  (:import [backtype.storm.transactional TransactionalSpoutCoordinator])
 +  (:import [backtype.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
 +  (:import [backtype.storm.tuple Tuple])
 +  (:import [backtype.storm.generated StormTopology])
 +  (:import [backtype.storm.task TopologyContext])
 +  (:require [backtype.storm [zookeeper :as zk]])
 +  (:require [backtype.storm.messaging.loader :as msg-loader])
 +  (:require [backtype.storm.daemon.acker :as acker])
 +  (:use [backtype.storm cluster util thrift config log]))
 +
 +(defn feeder-spout [fields]
 +  (FeederSpout. (Fields. fields)))
 +
 +(defn local-temp-path []
-   (str (System/getProperty "java.io.tmpdir") "/" (uuid)))
++  (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid)))
 +
 +(defn delete-all [paths]
 +  (dorun
 +    (for [t paths]
 +      (if (.exists (File. t))
 +        (FileUtils/forceDelete (File. t))
 +        ))))
 +
 +(defmacro with-local-tmp [[& tmp-syms] & body]
 +  (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)]
 +    `(let [~@tmp-paths]
 +      (try
 +        ~@body
 +      (finally
 +       (delete-all ~(vec tmp-syms)))
 +      ))
 +    ))
 +
 +(defn start-simulating-time! []
 +  (Time/startSimulating))
 +
 +(defn stop-simulating-time! []
 +  (Time/stopSimulating))
 +
 +(defmacro with-simulated-time [& body]
 +  `(do
 +     (start-simulating-time!)
 +     (let [ret# (do ~@body)]
 +       (stop-simulating-time!)
 +       ret#
 +       )))
 +
 +(defn advance-time-ms! [ms]
 +  (Time/advanceTime ms))
 +
 +(defn advance-time-secs! [secs]
 +  (advance-time-ms! (* (long secs) 1000)))
 +
 +
 +(defnk add-supervisor [cluster-map :ports 2 :conf {} :id nil]
 +  (let [tmp-dir (local-temp-path)
 +        port-ids (if (sequential? ports) ports (doall (repeatedly ports (:port-counter cluster-map))))
 +        supervisor-conf (merge (:daemon-conf cluster-map)
 +                               conf
 +                               {STORM-LOCAL-DIR tmp-dir
 +                                SUPERVISOR-SLOTS-PORTS port-ids
 +                               })
 +        id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
 +        daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
 +    (swap! (:supervisors cluster-map) conj daemon)
 +    (swap! (:tmp-dirs cluster-map) conj tmp-dir)
 +    daemon
 +    ))
 +
 +(defn mk-shared-context [conf]
 +  (if-not (conf STORM-LOCAL-MODE-ZMQ)
 +    (msg-loader/mk-local-context)
 +    ))
 +
 +;; returns map containing cluster info
 +;; local dir is always overridden in maps
 +;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
 +;; if need to customize amt of ports more, can use add-supervisor calls afterwards
 +(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1]
 +  (let [zk-tmp (local-temp-path)
 +        [zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
 +        daemon-conf (merge (read-storm-config)
 +                           {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
 +                            ZMQ-LINGER-MILLIS 0
 +                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
 +                            TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
 +                            }
 +                           daemon-conf
 +                           {STORM-CLUSTER-MODE "local"
 +                            STORM-ZOOKEEPER-PORT zk-port
 +                            STORM-ZOOKEEPER-SERVERS ["localhost"]})
 +        nimbus-tmp (local-temp-path)
 +        port-counter (mk-counter supervisor-slot-port-min)
 +        nimbus (nimbus/service-handler
 +                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
 +                (if inimbus inimbus (nimbus/standalone-nimbus)))
 +        context (mk-shared-context daemon-conf)
 +        cluster-map {:nimbus nimbus
 +                     :port-counter port-counter
 +                     :daemon-conf daemon-conf
 +                     :supervisors (atom [])
 +                     :state (mk-distributed-cluster-state daemon-conf)
 +                     :storm-cluster-state (mk-storm-cluster-state daemon-conf)
 +                     :tmp-dirs (atom [nimbus-tmp zk-tmp])
 +                     :zookeeper zk-handle
 +                     :shared-context context}
 +        supervisor-confs (if (sequential? supervisors)
 +                           supervisors
 +                           (repeat supervisors {}))]
 +    (doseq [sc supervisor-confs]
 +      (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))
 +    cluster-map
 +    ))
 +
 +(defn get-supervisor [cluster-map supervisor-id]
 +  (let [finder-fn #(= (.get-id %) supervisor-id)]
 +    (find-first finder-fn @(:supervisors cluster-map))
 +    ))
 +
 +(defn kill-supervisor [cluster-map supervisor-id]
 +  (let [finder-fn #(= (.get-id %) supervisor-id)
 +        supervisors @(:supervisors cluster-map)
 +        sup (find-first finder-fn
 +                        supervisors)]
 +    ;; tmp-dir will be taken care of by shutdown
 +    (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
 +    (.shutdown sup)
 +    ))
 +
 +(defn kill-local-storm-cluster [cluster-map]
 +  (.shutdown (:nimbus cluster-map))
 +  (.close (:state cluster-map))
 +  (.disconnect (:storm-cluster-state cluster-map))
 +  (doseq [s @(:supervisors cluster-map)]
 +    (.shutdown-all-workers s)
 +    ;; race condition here? will it launch the workers again?
 +    (supervisor/kill-supervisor s))
 +  (psim/kill-all-processes)
 +  (log-message "Shutting down in process zookeeper")
 +  (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
 +  (log-message "Done shutting down in process zookeeper")
 +  (doseq [t @(:tmp-dirs cluster-map)]
 +    (log-message "Deleting temporary path " t)
-     (rmr t)
++    (try
++      (rmr t)
++      (catch Exception e (log-message (.getMessage e)))) ;; on windows, the host process still holds lock on the logfile
 +    ))
 +
 +(def TEST-TIMEOUT-MS 5000)
 +
 +(defmacro while-timeout [timeout-ms condition & body]
 +  `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
 +    (while ~condition
 +      (when (> (System/currentTimeMillis) end-time#)
 +        (throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms)"))))
 +      ~@body)))
 +
 +(defn wait-until-cluster-waiting
 +  "Wait until the cluster is idle. Should be used with time simulation."
 +  [cluster-map]
 +  ;; wait until all workers, supervisors, and nimbus is waiting
 +  (let [supervisors @(:supervisors cluster-map)
 +        workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
 +        daemons (concat
 +                  [(:nimbus cluster-map)]
 +                  supervisors
 +                  workers) ; because a worker may already be dead
 +        ]
 +    (while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
 +      (Thread/sleep 10)
 +;;      (doseq [d daemons]
 +;;        (if-not ((memfn waiting?) d)
 +;;          (println d)))
 +      )))
 +
 +(defn advance-cluster-time
 +  ([cluster-map secs increment-secs]
 +    (loop [left secs]
 +      (when (> left 0)
 +        (let [diff (min left increment-secs)]
 +          (advance-time-secs! diff)
 +          (wait-until-cluster-waiting cluster-map)
 +          (recur (- left diff))
 +          ))))
 +  ([cluster-map secs]
 +    (advance-cluster-time cluster-map secs 1)
 +    ))
 +
 +(defmacro with-local-cluster [[cluster-sym & args] & body]
 +  `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
 +     (try
 +       ~@body
 +     (catch Throwable t#
 +       (log-error t# "Error in cluster")
 +       (throw t#)
 +       )
 +     (finally
 +       (kill-local-storm-cluster ~cluster-sym)))
 +       ))
 +
 +(defmacro with-simulated-time-local-cluster [& args]
 +  `(with-simulated-time
 +    (with-local-cluster ~@args)))
 +
 +(defmacro with-inprocess-zookeeper [port-sym & body]
 +  `(with-local-tmp [tmp#]
 +     (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
 +       (try
 +         ~@body
 +       (finally
 +         (zk/shutdown-inprocess-zookeeper zks#)
 +         ))
 +       )))
 +
 +(defn submit-local-topology [nimbus storm-name conf topology]
 +  (when-not (Utils/isValidConf conf)
 +    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
 +  (.submitTopology nimbus storm-name nil (to-json conf) topology))
 +
 +(defn submit-local-topology-with-opts [nimbus storm-name conf topology submit-opts]
 +  (when-not (Utils/isValidConf conf)
 +    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
 +  (.submitTopologyWithOpts nimbus storm-name nil (to-json conf) topology submit-opts))
 +
 +(defn mocked-compute-new-topology->executor->node+port [storm-name executor->node+port]
 +  (fn [nimbus existing-assignments topologies scratch-topology-id]
 +    (let [topology (.getByName topologies storm-name)
 +          topology-id (.getId topology)
 +          existing-assignments (into {} (for [[tid assignment] existing-assignments]
 +                                          {tid (:executor->node+port assignment)}))
 +          new-assignments (assoc existing-assignments topology-id executor->node+port)]
 +      new-assignments)))
 +
 +(defn submit-mocked-assignment [nimbus storm-name conf topology task->component executor->node+port]
 +  (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
 +                   nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
 +                                                                     storm-name
 +                                                                     executor->node+port)]
 +    (submit-local-topology nimbus storm-name conf topology)
 +    ))
 +
 +(defn mk-capture-launch-fn [capture-atom]
 +  (fn [supervisor storm-id port worker-id]
 +    (let [supervisor-id (:supervisor-id supervisor)
 +          existing (get @capture-atom [supervisor-id port] [])]
 +      (swap! capture-atom assoc [supervisor-id port] (conj existing storm-id))
 +      )))
 +
 +(defn find-worker-id [supervisor-conf port]
 +  (let [supervisor-state (supervisor-state supervisor-conf)
 +        worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)]
 +    (first ((reverse-map worker->port) port))
 +    ))
 +
 +(defn find-worker-port [supervisor-conf worker-id]
 +  (let [supervisor-state (supervisor-state supervisor-conf)
 +        worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)
 +        ]
 +    (worker->port worker-id)
 +    ))
 +
 +(defn mk-capture-shutdown-fn [capture-atom]
 +  (let [existing-fn supervisor/shutdown-worker]
 +    (fn [supervisor worker-id]
 +      (let [conf (:conf supervisor)
 +            supervisor-id (:supervisor-id supervisor)
 +            port (find-worker-port conf worker-id)
 +            existing (get @capture-atom [supervisor-id port] 0)]      
 +        (swap! capture-atom assoc [supervisor-id port] (inc existing))
 +        (existing-fn supervisor worker-id)
 +        ))))
 +
 +(defmacro capture-changed-workers [& body]
 +  `(let [launch-captured# (atom {})
 +         shutdown-captured# (atom {})]
 +    (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
 +                     supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
 +      ~@body
 +      {:launched @launch-captured#
 +       :shutdown @shutdown-captured#}
 +      )))
 +
 +(defmacro capture-launched-workers [& body]
 +  `(:launched (capture-changed-workers ~@body)))
 +
 +(defmacro capture-shutdown-workers [& body]
 +  `(:shutdown (capture-changed-workers ~@body)))
 +
 +(defnk aggregated-stat [cluster-map storm-name stat-key :component-ids nil]
 +  (let [state (:storm-cluster-state cluster-map)
 +        nimbus (:nimbus cluster-map)
 +        storm-id (common/get-storm-id state storm-name)
 +        
 +        component->tasks (reverse-map
 +                          (common/storm-task-info
 +                           (.getUserTopology nimbus storm-id)
 +                           (from-json (.getTopologyConf nimbus storm-id))))
 +        component->tasks (if component-ids
 +                           (select-keys component->tasks component-ids)
 +                           component->tasks)
 +        task-ids (apply concat (vals component->tasks))
 +        assignment (.assignment-info state storm-id nil) 
 +        taskbeats (.taskbeats state storm-id (:task->node+port assignment))
 +        heartbeats (dofor [id task-ids] (get taskbeats id))
 +        stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
 +    (reduce + stats)
 +    ))
 +
 +(defn emitted-spout-tuples [cluster-map topology storm-name]
 +  (aggregated-stat cluster-map
 +                   storm-name
 +                   :emitted
 +                   :component-ids (keys (.get_spouts topology))))
 +
 +(defn transferred-tuples [cluster-map storm-name]
 +  (aggregated-stat cluster-map storm-name :transferred))
 +
 +(defn acked-tuples [cluster-map storm-name]
 +  (aggregated-stat cluster-map storm-name :acked))
 +
 +(defn simulate-wait [cluster-map]
 +  (if (Time/isSimulating)
 +    (advance-cluster-time cluster-map 10)
 +    (Thread/sleep 100)
 +    ))
 +
 +(defprotocol CompletableSpout
 +  (exhausted? [this] "Whether all the tuples for this spout have been completed.")
 +  (cleanup [this] "Cleanup any global state kept")
 +  (startup [this] "Prepare the spout (globally) before starting the topology"))
 +
 +(extend-type FixedTupleSpout
 +  CompletableSpout
 +  (exhausted? [this]
 +    (= (-> this .getSourceTuples count)
 +       (.getCompleted this)))
 +  (cleanup [this]
 +    (.cleanup this))
 +  (startup [this]
 +    ))
 +
 +(extend-type TransactionalSpoutCoordinator
 +  CompletableSpout
 +  (exhausted? [this]
 +    (exhausted? (.getSpout this)))
 +  (cleanup [this]
 +    (cleanup (.getSpout this)))
 +  (startup [this]
 +    (startup (.getSpout this))))
 +
 +(extend-type PartitionedTransactionalSpoutExecutor
 +  CompletableSpout
 +  (exhausted? [this]
 +    (exhausted? (.getPartitionedSpout this)))
 +  (cleanup [this]
 +    (cleanup (.getPartitionedSpout this)))
 +  (startup [this]
 +    (startup (.getPartitionedSpout this))
 +    ))
 +
 +(extend-type MemoryTransactionalSpout
 +  CompletableSpout
 +  (exhausted? [this]
 +    (.isExhaustedTuples this))
 +  (cleanup [this]
 +    (.cleanup this))
 +  (startup [this]
 +    (.startup this)))
 +
 +(defn spout-objects [spec-map]
 +  (for [[_ spout-spec] spec-map]
 +    (-> spout-spec
 +        .get_spout_object
 +        deserialized-component-object)))
 +
 +(defn capture-topology [topology]
 +  (let [topology (.deepCopy topology)
 +        spouts (.get_spouts topology)
 +        bolts (.get_bolts topology)
 +        all-streams (apply concat
 +                           (for [[id spec] (merge (clojurify-structure spouts)
 +                                                  (clojurify-structure bolts))]
 +                             (for [[stream info] (.. spec get_common get_streams)]
 +                               [(GlobalStreamId. id stream) (.is_direct info)])))
 +        capturer (TupleCaptureBolt.)]
 +    (.set_bolts topology
 +                (assoc (clojurify-structure bolts)
 +                  (uuid)
 +                  (Bolt.                   
 +                   (serialize-component-object capturer)
 +                   (mk-plain-component-common (into {} (for [[id direct?] all-streams]
 +                                                         [id (if direct?
 +                                                               (mk-direct-grouping)
 +                                                               (mk-global-grouping))]))
 +                                              {}
 +                                              nil))
 +                  ))
 +    {:topology topology
 +     :capturer capturer}
 +    ))
 +
 +;; TODO: mock-sources needs to be able to mock out state spouts as well
 +(defnk complete-topology [cluster-map topology :mock-sources {} :storm-conf {} :cleanup-state true :topology-name nil]
 +  ;; TODO: the idea of mocking for transactional topologies should be done an
 +  ;; abstraction level above... should have a complete-transactional-topology for this
 +  (let [{topology :topology capturer :capturer} (capture-topology topology)
 +        storm-name (or topology-name (str "topologytest-" (uuid)))
 +        state (:storm-cluster-state cluster-map)
 +        spouts (.get_spouts topology)
 +        replacements (map-val (fn [v]
 +                                (FixedTupleSpout.
 +                                 (for [tup v]
 +                                   (if (map? tup)
 +                                     (FixedTuple. (:stream tup) (:values tup))
 +                                     tup))))
 +                              mock-sources)
 +        
 +
 +        ]
 +    (doseq [[id spout] replacements]
 +      (let [spout-spec (get spouts id)]
 +        (.set_spout_object spout-spec (serialize-component-object spout))
 +        ))
 +    (doseq [spout (spout-objects spouts)]
 +      (when-not (extends? CompletableSpout (.getClass spout))
 +        (throw (RuntimeException. "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be)"))
 +        ))
 +
 +    (doseq [spout (spout-objects spouts)]
 +      (startup spout))
 +    
 +    (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
 +    
 +    
 +    (let [storm-id (common/get-storm-id state storm-name)]
 +      (while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
 +        (simulate-wait cluster-map))
 +
 +      (.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
 +      (while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
 +        (simulate-wait cluster-map))
 +      (when cleanup-state
 +        (doseq [spout (spout-objects spouts)]
 +          (cleanup spout))))
 +
 +    (if cleanup-state
 +      (.getAndRemoveResults capturer)
 +      (.getAndClearResults capturer))
 +    ))
 +
 +(defn read-tuples
 +  ([results component-id stream-id]
 +     (let [fixed-tuples (get results component-id [])]
 +       (mapcat
 +        (fn [ft]
 +          (if (= stream-id (. ft stream))
 +            [(vec (. ft values))]))
 +        fixed-tuples)
 +       ))
 +  ([results component-id]
 +     (read-tuples results component-id Utils/DEFAULT_STREAM_ID)
 +     ))
 +
 +(defn ms= [& args]  
 +  (apply = (map multi-set args)))
 +
 +(def TRACKER-BOLT-ID "+++tracker-bolt")
 +
 +;; TODO:  should override system-topology! and wrap everything there
 +(defn mk-tracked-topology
 +  ([tracked-cluster topology]
 +     (let [track-id (::track-id tracked-cluster)
 +           ret (.deepCopy topology)]
 +       (dofor [[_ bolt] (.get_bolts ret)
 +               :let [obj (deserialized-component-object (.get_bolt_object bolt))]]
 +              (.set_bolt_object bolt (serialize-component-object
 +                                      (BoltTracker. obj track-id))))
 +       (dofor [[_ spout] (.get_spouts ret)
 +               :let [obj (deserialized-component-object (.get_spout_object spout))]]
 +              (.set_spout_object spout (serialize-component-object
 +                                        (SpoutTracker. obj track-id))))
 +       {:topology ret
 +        :last-spout-emit (atom 0)
 +        :cluster tracked-cluster
 +        }
 +       )))
 +
 +(defn assoc-track-id [cluster track-id]
 +  (assoc cluster ::track-id track-id))
 +
 +(defn increment-global! [id key amt]
 +  (-> (RegisteredGlobalState/getState id)
 +      (get key)
 +      (.addAndGet amt)))
 +
 +(defn global-amt [id key]
 +  (-> (RegisteredGlobalState/getState id)
 +      (get key)
 +      .get
 +      ))
 +
 +(defmacro with-tracked-cluster [[cluster-sym & cluster-args] & body]
 +  `(let [id# (uuid)]
 +     (RegisteredGlobalState/setState id#
 +                                     (doto (ConcurrentHashMap.)
 +                                       (.put "spout-emitted" (AtomicInteger. 0))
 +                                       (.put "transferred" (AtomicInteger. 0))
 +                                       (.put "processed" (AtomicInteger. 0))))
 +     (with-var-roots [acker/mk-acker-bolt (let [old# acker/mk-acker-bolt]
 +                                            (fn [& args#]
 +                                              (NonRichBoltTracker. (apply old# args#) id#)
 +                                              ))
 +                      ;; critical that this particular function is overridden here,
 +                      ;; since the transferred stat needs to be incremented at the moment
 +                      ;; of tuple emission (and not on a separate thread later) for
 +                      ;; topologies to be tracked correctly. This is because "transferred" *must*
 +                      ;; be incremented before "processing".
 +                      executor/mk-executor-transfer-fn
 +                          (let [old# executor/mk-executor-transfer-fn]
 +                            (fn [& args#]
 +                              (let [transferrer# (apply old# args#)]
 +                                (fn [& args2#]
 +                                  ;; (log-message "Transferring: " transfer-args#)
 +                                  (increment-global! id# "transferred" 1)
 +                                  (apply transferrer# args2#)
 +                                  ))))
 +                      ]
 +       (with-local-cluster [~cluster-sym ~@cluster-args]
 +         (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
 +           ~@body)
 +         ))
 +     (RegisteredGlobalState/clearState id#)
 +     ))
 +
 +(defn tracked-wait
 +  "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
 +  ([tracked-topology]
 +     (tracked-wait tracked-topology 1))
 +  ([tracked-topology amt]
 +      (let [target (+ amt @(:last-spout-emit tracked-topology))
 +            track-id (-> tracked-topology :cluster ::track-id)
 +            waiting? (fn []
 +                       (or (not= target (global-amt track-id "spout-emitted"))
 +                           (not= (global-amt track-id "transferred")                                 
 +                                 (global-amt track-id "processed"))
 +                           ))]
 +        (while-timeout TEST-TIMEOUT-MS (waiting?)
 +          ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
 +          ;; (println "Processed: " (global-amt track-id "processed"))
 +          ;; (println "Transferred: " (global-amt track-id "transferred"))
 +          (Thread/sleep 500))
 +        (reset! (:last-spout-emit tracked-topology) target)
 +        )))
 +
 +(defnk test-tuple [values
 +                   :stream Utils/DEFAULT_STREAM_ID
 +                   :component "component"
 +                   :fields nil]
 +  (let [fields (or fields
 +                   (->> (iterate inc 1)
 +                        (take (count values))
 +                        (map #(str "field" %))))
 +        spout-spec (mk-spout-spec* (TestWordSpout.)
 +                                   {stream fields})
 +        topology (StormTopology. {component spout-spec} {} {})
 +        context (TopologyContext.
 +                  topology
 +                  (read-storm-config)
 +                  {(int 1) component}
 +                  {component [(int 1)]}
 +                  {component {stream (Fields. fields)}}
 +                  "test-storm-id"
 +                  nil
 +                  nil
 +                  (int 1)
 +                  nil
 +                  [(int 1)]
 +                  {}
 +                  {}
 +                  (HashMap.)
 +                  (HashMap.)
 +                  (atom false))]
 +    (TupleImpl. context values 1 stream)
 +    ))


Mime
View raw message