storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [02/10] storm git commit: [STORM-1269] port backtype.storm.daemon.common to java
Date Thu, 10 Mar 2016 14:33:49 GMT
[STORM-1269] port backtype.storm.daemon.common to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7241a67
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7241a67
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7241a67

Branch: refs/heads/master
Commit: c7241a67c23899ebb3d6c25cdccde758efb7a0ad
Parents: 87e3c24
Author: basti.lj <basti.lj@alibaba-inc.com>
Authored: Fri Mar 4 15:16:59 2016 +0800
Committer: basti.lj <basti.lj@alibaba-inc.com>
Committed: Fri Mar 4 15:16:59 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/converter.clj      |  15 +
 .../src/clj/org/apache/storm/daemon/common.clj  | 361 +----------
 .../src/clj/org/apache/storm/daemon/drpc.clj    |   6 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  22 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |   5 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  63 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |   9 +-
 .../src/clj/org/apache/storm/daemon/task.clj    |   5 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  24 +-
 storm-core/src/clj/org/apache/storm/testing.clj | 100 +--
 storm-core/src/clj/org/apache/storm/ui/core.clj |  18 +-
 .../org/apache/storm/daemon/DaemonCommon.java   |  22 +
 .../org/apache/storm/daemon/StormCommon.java    | 605 +++++++++++++++++++
 .../storm/utils/StormCommonInstaller.java       |  43 ++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  50 ++
 .../org/apache/storm/integration_test.clj       |   6 +-
 .../storm/messaging/netty_integration_test.clj  |   1 -
 .../test/clj/org/apache/storm/nimbus_test.clj   | 121 ++--
 .../apache/storm/security/auth/auth_test.clj    |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  11 +-
 .../utils/staticmocking/CommonInstaller.java    |  38 ++
 21 files changed, 981 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index e269c5d..8b5bc3e 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -73,6 +73,13 @@
                                                           (:worker->resources assignment)))))
     thrift-assignment))
 
+(defn clojurify-task->node_port [task->node_port]
+  (into {}
+    (map-val
+      (fn [nodeInfo]
+        (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
+      task->node_port)))
+
 ;TODO: when translating this function, you should replace the map-key with a proper for loop HERE
 (defn clojurify-executor->node_port [executor->node_port]
   (into {}
@@ -84,6 +91,14 @@
           (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
         executor->node_port))))
 
+(defn thriftify-executor->node_port [executor->node_port]
+  (into {}
+    (map (fn [[k v]]
+            [(map long k)
+             (NodeInfo. (first v) (set (map long (rest v))))])
+          executor->node_port))
+)
+
 (defn clojurify-worker->resources [worker->resources]
   "convert worker info to be [node, port]
    convert resources to be [mem_on_heap mem_off_heap cpu]"

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 65cf233..cc5436c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -15,53 +15,10 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.common
   (:use [org.apache.storm log config util])
-  (:import [org.apache.storm.generated StormTopology NodeInfo
-            InvalidTopologyException GlobalStreamId Grouping Grouping$_Fields]
-           [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils]
-           [org.apache.storm.daemon.metrics.reporters PreparableReporter]
-           [com.codahale.metrics MetricRegistry])
-  (:import [org.apache.storm.daemon.metrics MetricsUtils])
-  (:import [org.apache.storm.task WorkerTopologyContext])
-  (:import [org.apache.storm Constants])
-  (:import [org.apache.storm.cluster StormClusterStateImpl])
-  (:import [org.apache.storm.metric SystemBolt])
-  (:import [org.apache.storm.metric EventLoggerBolt])
-  (:import [org.apache.storm.security.auth IAuthorizer])
-  (:import [java.io InterruptedIOException]
-           [org.json.simple JSONValue])
-  (:import [java.util HashMap])
-  (:import [org.apache.storm Thrift]
-           (org.apache.storm.daemon Acker))
   (:require [clojure.set :as set])
-  (:require [metrics.reporters.jmx :as jmx])
-  (:require [metrics.core  :refer [default-registry]]))
-
-(defn start-metrics-reporter [reporter conf]
-  (doto reporter
-    (.prepare default-registry conf)
-    (.start))
-  (log-message "Started statistics report plugin..."))
-
-(defn start-metrics-reporters [conf]
-  (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
-    (start-metrics-reporter reporter conf)))
-
-
-(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
-(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
-(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
-(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
-
-(def SYSTEM-STREAM-ID "__system")
-
-(def EVENTLOGGER-COMPONENT-ID "__eventlogger")
-(def EVENTLOGGER-STREAM-ID "__eventlog")
-
-(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
-(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
-(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
-(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
-(def CREDENTIALS-CHANGED-STREAM-ID Constants/CREDENTIALS_CHANGED_STREAM_ID)
+  (:import (org.apache.storm.task WorkerTopologyContext)
+           (org.apache.storm.utils Utils ConfigUtils)
+           (java.io InterruptedIOException)))
 
 ;; the task id is the virtual port
 ;; node->host is here so that tasks know who to talk to just from assignment
@@ -74,9 +31,6 @@
 
 (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
 
-(defprotocol DaemonCommon
-  (waiting? [this]))
-
 (defrecord ExecutorStats [^long processed
                           ^long acked
                           ^long emitted
@@ -86,26 +40,6 @@
 (defn new-executor-stats []
   (ExecutorStats. 0 0 0 0 0))
 
-
-(defn get-storm-id [storm-cluster-state storm-name]
-  (let [active-storms (.activeStorms storm-cluster-state)
-        pred  (reify IPredicate (test [this x] (= storm-name (.get_name (.stormBase storm-cluster-state x nil)))))]
-    (Utils/findOne pred active-storms)
-    ))
-
-(defn topology-bases [storm-cluster-state]
-  (let [active-topologies (.activeStorms storm-cluster-state)]
-    (into {}
-          (dofor [id active-topologies]
-                 [id  (.stormBase storm-cluster-state id nil)]
-                 ))
-    ))
-
-(defn validate-distributed-mode! [conf]
-  (if (ConfigUtils/isLocalMode conf)
-      (throw
-        (IllegalArgumentException. "Cannot start server in local mode!"))))
-
 (defmacro defserverfn [name & body]
   `(let [exec-fn# (fn ~@body)]
     (defn ~name [& args#]
@@ -120,279 +54,6 @@
         (Utils/exitProcess 13 "Error on initialization")
         )))))
 
-(defn- validate-ids! [^StormTopology topology]
-  (let [sets (map #(.getFieldValue topology %) (Thrift/getTopologyFields))
-        offending (apply set/intersection sets)]
-    (if-not (empty? offending)
-      (throw (InvalidTopologyException.
-              (str "Duplicate component ids: " offending))))
-    (doseq [f (Thrift/getTopologyFields)
-            :let [obj-map (.getFieldValue topology f)]]
-      (if-not (ThriftTopologyUtils/isWorkerHook f)
-        (do
-          (doseq [id (keys obj-map)]
-            (if (Utils/isSystemId id)
-              (throw (InvalidTopologyException.
-                       (str id " is not a valid component id")))))
-          (doseq [obj (vals obj-map)
-                  id (-> obj .get_common .get_streams keys)]
-            (if (Utils/isSystemId id)
-              (throw (InvalidTopologyException.
-                       (str id " is not a valid stream id"))))))))))
-
-(defn all-components [^StormTopology topology]
-  (apply merge {}
-    (for [f (Thrift/getTopologyFields)]
-      (if-not (ThriftTopologyUtils/isWorkerHook f)
-        (.getFieldValue topology f)))))
-
-(defn component-conf [component]
-  (->> component
-       .get_common
-       .get_json_conf
-       (#(if % (JSONValue/parse %)))
-       clojurify-structure))
-
-(defn validate-basic! [^StormTopology topology]
-  (validate-ids! topology)
-  (doseq [f (Thrift/getSpoutFields)
-          obj (->> f (.getFieldValue topology) vals)]
-    (if-not (empty? (-> obj .get_common .get_inputs))
-      (throw (InvalidTopologyException. "May not declare inputs for a spout"))))
-  (doseq [[comp-id comp] (all-components topology)
-          :let [conf (component-conf comp)
-                p (-> comp .get_common (Thrift/getParallelismHint))]]
-    (when (and (> (conf TOPOLOGY-TASKS) 0)
-               p
-               (<= p 0))
-      (throw (InvalidTopologyException. "Number of executors must be greater than 0 when number of tasks is greater than 0"))
-      )))
-
-(defn validate-structure! [^StormTopology topology]
-  ;; validate all the component subscribe from component+stream which actually exists in the topology
-  ;; and if it is a fields grouping, validate the corresponding field exists  
-  (let [all-components (all-components topology)]
-    (doseq [[id comp] all-components
-            :let [inputs (.. comp get_common get_inputs)]]
-      (doseq [[global-stream-id grouping] inputs
-              :let [source-component-id (.get_componentId global-stream-id)
-                    source-stream-id    (.get_streamId global-stream-id)]]
-        (if-not (contains? all-components source-component-id)
-          (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent component [" source-component-id "]")))
-          (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)]
-            (if-not (contains? source-streams source-stream-id)
-              (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from non-existent stream: [" source-stream-id "] of component [" source-component-id "]")))
-              (if (= Grouping$_Fields/FIELDS (Thrift/groupingType grouping))
-                (let [grouping-fields (set (.get_fields grouping))
-                      source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set)
-                      diff-fields (set/difference grouping-fields source-stream-fields)]
-                  (when-not (empty? diff-fields)
-                    (throw (InvalidTopologyException. (str "Component: [" id "] subscribes from stream: [" source-stream-id "] of component [" source-component-id "] with non-existent fields: " diff-fields)))))))))))))
-
-(defn acker-inputs [^StormTopology topology]
-  (let [bolt-ids (.. topology get_bolts keySet)
-        spout-ids (.. topology get_spouts keySet)
-        spout-inputs (apply merge
-                            (for [id spout-ids]
-                              {(Utils/getGlobalStreamId id ACKER-INIT-STREAM-ID)
-                                (Thrift/prepareFieldsGrouping ["id"])}
-                              ))
-        bolt-inputs (apply merge
-                           (for [id bolt-ids]
-                             {(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID)
-                              (Thrift/prepareFieldsGrouping ["id"])
-                              (Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID)
-                              (Thrift/prepareFieldsGrouping ["id"])}
-                             ))]
-    (merge spout-inputs bolt-inputs)))
-
-;; the event logger receives inputs from all the spouts and bolts
-;; with a field grouping on component id so that all tuples from a component
-;; goes to same executor and can be viewed via logviewer.
-(defn eventlogger-inputs [^StormTopology topology]
-  (let [bolt-ids (.. topology get_bolts keySet)
-        spout-ids (.. topology get_spouts keySet)
-        spout-inputs (apply merge
-                       (for [id spout-ids]
-                         {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID)
-                          (Thrift/prepareFieldsGrouping ["component-id"])}
-                         ))
-        bolt-inputs (apply merge
-                      (for [id bolt-ids]
-                        {(Utils/getGlobalStreamId id EVENTLOGGER-STREAM-ID)
-                         (Thrift/prepareFieldsGrouping ["component-id"])}
-                        ))]
-    (merge spout-inputs bolt-inputs)))
-
-(defn mk-acker-bolt []
-  (Acker.))
-
-(defn add-acker! [storm-conf ^StormTopology ret]
-  (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
-        acker-bolt (Thrift/prepareSerializedBoltDetails (acker-inputs ret)
-                                                        (mk-acker-bolt)
-                                                        {ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"])
-                                                         ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"])
-                                                        }
-                                                        (Integer. num-executors)
-                                                        {TOPOLOGY-TASKS num-executors
-                                                         TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
-    (dofor [[_ bolt] (.get_bolts ret)
-            :let [common (.get_common bolt)]]
-           (do
-             (.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields ["id" "ack-val"]))
-             (.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields ["id"]))
-             ))
-    (dofor [[_ spout] (.get_spouts ret)
-            :let [common (.get_common spout)
-                  spout-conf (merge
-                               (component-conf spout)
-                               {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
-      (do
-        ;; this set up tick tuples to cause timeouts to be triggered
-        (.set_json_conf common (JSONValue/toJSONString spout-conf))
-        (.put_to_streams common ACKER-INIT-STREAM-ID (Thrift/outputFields ["id" "init-val" "spout-task"]))
-        (.put_to_inputs common
-                        (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
-                        (Thrift/prepareDirectGrouping))
-        (.put_to_inputs common
-                        (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
-                        (Thrift/prepareDirectGrouping))
-        ))
-    (.put_to_bolts ret "__acker" acker-bolt)
-    ))
-
-(defn add-metric-streams! [^StormTopology topology]
-  (doseq [[_ component] (all-components topology)
-          :let [common (.get_common component)]]
-    (.put_to_streams common METRICS-STREAM-ID
-                     (Thrift/outputFields ["task-info" "data-points"]))))
-
-(defn add-system-streams! [^StormTopology topology]
-  (doseq [[_ component] (all-components topology)
-          :let [common (.get_common component)]]
-    (.put_to_streams common SYSTEM-STREAM-ID (Thrift/outputFields ["event"]))))
-
-
-(defn map-occurrences [afn coll]
-  (->> coll
-       (reduce (fn [[counts new-coll] x]
-                 (let [occurs (inc (get counts x 0))]
-                   [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
-               [{} []])
-       (second)
-       (reverse)))
-
-(defn number-duplicates
-  "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
-  [coll]
-  (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
-
-(defn metrics-consumer-register-ids
-  "Generates a list of component ids for each metrics consumer
-   e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
-  [storm-conf]
-  (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
-       (map #(get % "class"))
-       (number-duplicates)
-       (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
-
-(defn metrics-consumer-bolt-specs [storm-conf topology]
-  (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
-        inputs (->> (for [comp-id component-ids-that-emit-metrics]
-                      {(Utils/getGlobalStreamId comp-id METRICS-STREAM-ID)
-                       (Thrift/prepareShuffleGrouping)})
-                    (into {}))
-        mk-bolt-spec (fn [class arg p]
-                       (Thrift/prepareSerializedBoltDetails
-                         inputs
-                         (org.apache.storm.metric.MetricsConsumerBolt. class arg)
-                         {}
-                         (Integer. p)
-                         {TOPOLOGY-TASKS p}))]
-
-    (map
-     (fn [component-id register]
-       [component-id (mk-bolt-spec (get register "class")
-                                   (get register "argument")
-                                   (or (get register "parallelism.hint") 1))])
-     (metrics-consumer-register-ids storm-conf)
-     (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
-
-;; return the fields that event logger bolt expects
-(defn eventlogger-bolt-fields []
-  [(EventLoggerBolt/FIELD_COMPONENT_ID) (EventLoggerBolt/FIELD_MESSAGE_ID)  (EventLoggerBolt/FIELD_TS) (EventLoggerBolt/FIELD_VALUES)]
-  )
-
-(defn add-eventlogger! [storm-conf ^StormTopology ret]
-  (let [num-executors (if (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS))
-        eventlogger-bolt (Thrift/prepareSerializedBoltDetails (eventlogger-inputs ret)
-                           (EventLoggerBolt.)
-                           {}
-                           (Integer. num-executors)
-                           {TOPOLOGY-TASKS num-executors
-                            TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
-
-    (doseq [[_ component] (all-components ret)
-            :let [common (.get_common component)]]
-      (.put_to_streams common EVENTLOGGER-STREAM-ID (Thrift/outputFields (eventlogger-bolt-fields))))
-    (.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
-    ))
-
-(defn add-metric-components! [storm-conf ^StormTopology topology]
-  (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
-    (.put_to_bolts topology comp-id bolt-spec)))
-
-(defn add-system-components! [conf ^StormTopology topology]
-  (let [system-bolt-spec (Thrift/prepareSerializedBoltDetails
-                          {}
-                          (SystemBolt.)
-                          {SYSTEM-TICK-STREAM-ID (Thrift/outputFields ["rate_secs"])
-                           METRICS-TICK-STREAM-ID (Thrift/outputFields ["interval"])
-                           CREDENTIALS-CHANGED-STREAM-ID (Thrift/outputFields ["creds"])}
-                          (Integer. 0)
-                          {TOPOLOGY-TASKS 0})]
-    (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
-
-(defn system-topology! [storm-conf ^StormTopology topology]
-  (validate-basic! topology)
-  (let [ret (.deepCopy topology)]
-    (add-acker! storm-conf ret)
-    (add-eventlogger! storm-conf ret)
-    (add-metric-components! storm-conf ret)
-    (add-system-components! storm-conf ret)
-    (add-metric-streams! ret)
-    (add-system-streams! ret)
-    (validate-structure! ret)
-    ret
-    ))
-
-(defn has-ackers? [storm-conf]
-  (or (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0)))
-
-(defn has-eventloggers? [storm-conf]
-  (or (nil? (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS)) (> (storm-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) 0)))
-
-(defn num-start-executors [component]
-  (Thrift/getParallelismHint (.get_common component)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn storm-task-info
-  "Returns map from task -> component id"
-  [^StormTopology user-topology storm-conf]
-  (->> (system-topology! storm-conf user-topology)
-       all-components
-       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
-       (sort-by first)
-       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
-       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
-       (into {})
-       ))
-
-(defn executor-id->tasks [[first-task-id last-task-id]]
-  (->> (range first-task-id (inc last-task-id))
-       (map int)))
-
 (defn worker-context [worker]
   (WorkerTopologyContext. (:system-topology worker)
                           (:storm-conf worker)
@@ -408,19 +69,3 @@
                           (:default-shared-resources worker)
                           (:user-shared-resources worker)
                           ))
-
-
-(defn to-task->node+port [executor->node+port]
-  (->> executor->node+port
-       (mapcat (fn [[e node+port]] (for [t (executor-id->tasks e)] [t node+port])))
-       (into {})))
-
-(defn mk-authorization-handler [klassname conf]
-  (let [aznClass (if klassname (Class/forName klassname))
-        aznHandler (if aznClass (.newInstance aznClass))]
-    (if aznHandler (.prepare ^IAuthorizer aznHandler conf))
-    (log-debug "authorization class name:" klassname
-                 " class:" aznClass
-                 " handler:" aznHandler)
-    aznHandler
-  ))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index 001e810..24d7f2c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -24,7 +24,7 @@
             DistributedRPCInvocations$Processor])
   (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
             ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
-  (:import [org.apache.storm.daemon Shutdownable]
+  (:import [org.apache.storm.daemon Shutdownable StormCommon]
            [org.apache.storm.utils Time])
   (:import [java.net InetAddress])
   (:import [org.apache.storm.generated AuthorizationException]
@@ -75,7 +75,7 @@
 
 ;; TODO: change this to use TimeCacheMap
 (defn service-handler [conf]
-  (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
+  (let [drpc-acl-handler (StormCommon/mkAuthorizationHandler (conf DRPC-AUTHORIZER) conf)
         ctr (atom 0)
         id->sem (atom {})
         id->result (atom {})
@@ -268,7 +268,7 @@
                                      https-need-client-auth
                                      https-want-client-auth)
                                    (UIHelpers/configFilter server (ring.util.servlet/servlet app) filters-confs))))))
-      (start-metrics-reporters conf)
+      (StormCommon/startMetricsReporters conf)
       (when handler-server
         (.serve handler-server)))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 9ff93f8..0f95e28 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -31,7 +31,7 @@
   (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
   (:import [com.lmax.disruptor InsufficientCapacityException])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
-  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [org.apache.storm.daemon Shutdownable StormCommon])
   (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
@@ -228,7 +228,7 @@
 
 (defn mk-executor-data [worker executor-id]
   (let [worker-context (worker-context worker)
-        task-ids (executor-id->tasks executor-id)
+        task-ids (clojurify-structure (StormCommon/executorIdToTasks executor-id))
         component-id (.getComponentId worker-context (first task-ids))
         storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
         executor-type (executor-type worker-context component-id)
@@ -498,7 +498,7 @@
       (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
         (task/send-unanchored
           task-data
-          EVENTLOGGER-STREAM-ID
+          StormCommon/EVENTLOGGER_STREAM_ID
           [component-id message-id (System/currentTimeMillis) values]))))
 
 (defmethod mk-threads :spout [executor-data task-datas initial-credentials]
@@ -536,17 +536,17 @@
                                     (throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
                                   (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
                                     (condp = stream-id
-                                      ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
+                                      StormCommon/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
                                                                          spout-id tuple-finished-info time-delta id)
-                                      ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
+                                      StormCommon/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
                                                                            spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
                                       )))
                                 ;; TODO: on failure, emit tuple to failure stream
                                 ))))
         receive-queue (:receive-queue executor-data)
         event-handler (mk-task-receiver executor-data tuple-action-fn)
-        has-ackers? (has-ackers? storm-conf)
-        has-eventloggers? (has-eventloggers? storm-conf)
+        has-ackers? (clojurify-structure (StormCommon/hasAckers storm-conf))
+        has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
         emitted-count (MutableLong. 0)
         empty-emit-streak (MutableLong. 0)
         spout-transfer-fn (fn []
@@ -587,7 +587,7 @@
                                                                                          :values (if debug? values nil)}
                                                                                         (if (sampler) (System/currentTimeMillis))])
                                                                  (task/send-unanchored task-data
-                                                                                       ACKER-INIT-STREAM-ID
+                                                                                       StormCommon/ACKER_INIT_STREAM_ID
                                                                                        [root-id (Utils/bitXorVals out-ids) task-id]))
                                                                (when message-id
                                                                  (ack-spout-msg executor-data task-data message-id
@@ -742,7 +742,7 @@
                                                                (.getSourceComponent tuple)
                                                                (.getSourceStreamId tuple)
                                                                delta)))))))
-        has-eventloggers? (has-eventloggers? storm-conf)
+        has-eventloggers? (clojurify-structure (StormCommon/hasEventLoggers storm-conf))
         bolt-transfer-fn (fn []
                            ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
                            (while (not @(:storm-active-atom executor-data))
@@ -803,7 +803,7 @@
                                                   ack-val (.getAckVal tuple)]
                                               (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
                                                              (task/send-unanchored task-data
-                                                                                   ACKER-ACK-STREAM-ID
+                                                                                   StormCommon/ACKER_ACK_STREAM_ID
                                                                                    [root (bit-xor id ack-val)])))
                                             (let [delta (tuple-time-delta! tuple)
                                                   debug? (= true (storm-conf TOPOLOGY-DEBUG))]
@@ -818,7 +818,7 @@
                                           (^void fail [this ^Tuple tuple]
                                             (fast-list-iter [root (.. tuple getMessageId getAnchors)]
                                                             (task/send-unanchored task-data
-                                                                                  ACKER-FAIL-STREAM-ID
+                                                                                  StormCommon/ACKER_FAIL_STREAM_ID
                                                                                   [root]))
                                             (let [delta (tuple-time-delta! tuple)
                                                   debug? (= true (storm-conf TOPOLOGY-DEBUG))]

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 221dad7..8f28e36 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -33,7 +33,7 @@
            [java.net URLDecoder])
   (:import [java.nio.file Files Path Paths DirectoryStream])
   (:import [java.nio ByteBuffer])
-  (:import [org.apache.storm.daemon DirectoryCleaner])
+  (:import [org.apache.storm.daemon DirectoryCleaner StormCommon])
   (:import [org.yaml.snakeyaml Yaml]
            [org.yaml.snakeyaml.constructor SafeConstructor])
   (:import [org.apache.storm.ui InvalidRequestException UIHelpers IConfigurator FilterConfiguration]
@@ -46,7 +46,6 @@
             [ring.util.response :as resp]
             [clojure.string :as string])
   (:require [metrics.meters :refer [defmeter mark!]])
-  (:use [org.apache.storm.daemon.common :only [start-metrics-reporters]])
   (:gen-class))
 
 (def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
@@ -1208,4 +1207,4 @@
                  STORM-VERSION
                  "'")
     (start-logviewer! conf log-root daemonlog-root)
-    (start-metrics-reporters conf)))
+    (StormCommon/startMetricsReporters conf)))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index ed26a79..673f15d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -47,7 +47,7 @@
             ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
             BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
             ProfileRequest ProfileAction NodeInfo LSTopoHistory])
-  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
   (:import [org.apache.storm.validation ConfigValidation])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
   (:use [org.apache.storm util config log converter])
@@ -173,8 +173,8 @@
     {:conf conf
      :nimbus-host-port-info (NimbusInfo/fromConf conf)
      :inimbus inimbus
-     :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) conf)
-     :impersonation-authorization-handler (mk-authorization-handler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf)
+     :authorization-handler (StormCommon/mkAuthorizationHandler (conf NIMBUS-AUTHORIZER) conf)
+     :impersonation-authorization-handler (StormCommon/mkAuthorizationHandler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf)
      :submitted-count (atom 0)
      :storm-cluster-state (ClusterUtils/mkStormClusterState conf  (when
                                                                        (Utils/isZkAuthenticationConfiguredStormServer
@@ -371,7 +371,7 @@
        )))
 
 (defn transition-name! [nimbus storm-name event & args]
-  (let [storm-id (get-storm-id (:storm-cluster-state nimbus) storm-name)]
+  (let [storm-id (StormCommon/getStormId (:storm-cluster-state nimbus) storm-name)]
     (when-not storm-id
       (throw (NotAliveException. storm-name)))
     (apply transition! nimbus storm-id event args)))
@@ -651,8 +651,8 @@
         component->executors (:component->executors storm-base)
         storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
-        task->component (storm-task-info topology storm-conf)]
-    (->> (storm-task-info topology storm-conf)
+        task->component (clojurify-structure(StormCommon/stormTaskInfo topology storm-conf))]
+    (->> (StormCommon/stormTaskInfo topology storm-conf)
          (Utils/reverseMap)
          clojurify-structure
          (map-val sort)
@@ -669,7 +669,7 @@
         executors (compute-executors nimbus storm-id)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
         storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
-        task->component (storm-task-info topology storm-conf)
+        task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))
         executor->component (into {} (for [executor executors
                                            :let [start-task (first executor)
                                                  component (task->component start-task)]]
@@ -1001,8 +1001,8 @@
         conf (:conf nimbus)
         blob-store (:blob-store nimbus)
         storm-conf (read-storm-conf conf storm-id blob-store)
-        topology (system-topology! storm-conf (read-storm-topology storm-id blob-store))
-        num-executors (->> (all-components topology) (map-val num-start-executors))]
+        topology (StormCommon/systemTopology storm-conf (read-storm-topology storm-id blob-store))
+        num-executors (->> (clojurify-structure (StormCommon/allComponents topology)) (map-val #(StormCommon/numStartExecutors %)))]
     (log-message "Activating " storm-name ": " storm-id)
     (.activateStorm storm-cluster-state
                       storm-id
@@ -1024,7 +1024,7 @@
 ;; 3. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
 
 (defn storm-active? [storm-cluster-state storm-name]
-  (not-nil? (get-storm-id storm-cluster-state storm-name)))
+  (not-nil? (StormCommon/getStormId storm-cluster-state storm-name)))
 
 (defn check-storm-active! [nimbus storm-name active?]
   (if (= (not active?)
@@ -1085,8 +1085,8 @@
        ))
 
 (defn- component-parallelism [storm-conf component]
-  (let [storm-conf (merge storm-conf (component-conf component))
-        num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
+  (let [storm-conf (merge storm-conf (clojurify-structure (StormCommon/componentConf component)))
+        num-tasks (or (storm-conf TOPOLOGY-TASKS) (StormCommon/numStartExecutors component))
         max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
         ]
     (if max-parallelism
@@ -1095,11 +1095,11 @@
 
 (defn normalize-topology [storm-conf ^StormTopology topology]
   (let [ret (.deepCopy topology)]
-    (doseq [[_ component] (all-components ret)]
+    (doseq [[_ component] (clojurify-structure (StormCommon/allComponents ret))]
       (.set_json_conf
         (.get_common component)
         (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
-             (merge (component-conf component))
+             (merge (clojurify-structure (StormCommon/componentConf component)))
              JSONValue/toJSONString)))
     ret ))
 
@@ -1255,7 +1255,7 @@
   [conf storm-name nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         blob-store (:blob-store nimbus)
-        id (get-storm-id storm-cluster-state storm-name)]
+        id (StormCommon/getStormId storm-cluster-state storm-name)]
     (try-read-storm-conf conf id blob-store)))
 
 (defn try-read-storm-topology
@@ -1337,7 +1337,7 @@
 (defn validate-topology-size [topo-conf nimbus-conf topology]
   (let [workers-count (get topo-conf TOPOLOGY-WORKERS)
         workers-allowed (get nimbus-conf NIMBUS-SLOTS-PER-TOPOLOGY)
-        num-executors (->> (all-components topology) (map-val num-start-executors))
+        num-executors (->> (StormCommon/allComponents topology) clojurify-structure (map-val #(StormCommon/numStartExecutors %)))
         executors-count (reduce + (vals num-executors))
         executors-allowed (get nimbus-conf NIMBUS-EXECUTORS-PER-TOPOLOGY)]
     (when (and
@@ -1354,12 +1354,8 @@
         (str "Failed to submit topology. Topology requests more than " workers-allowed " workers."))))))
 
 (defn nimbus-topology-bases [storm-cluster-state]
-  (let [active-topologies (.activeStorms storm-cluster-state)]
-    (into {}
-      (dofor [id active-topologies]
-        [id  (clojurify-storm-base (.stormBase storm-cluster-state id nil))]
-        ))
-    ))
+  map-val #(clojurify-storm-base %) (clojurify-structure
+                                        (StormCommon/topologyBases storm-cluster-state)))
 
 (defn- set-logger-timeouts [log-config]
   (let [timeout-secs (.get_reset_log_level_timeout_secs log-config)
@@ -1409,7 +1405,7 @@
                                           topology-conf
                                           operation)
                   topology (try-read-storm-topology storm-id blob-store)
-                  task->component (storm-task-info topology topology-conf)
+                  task->component (clojurify-structure (StormCommon/stormTaskInfo topology topology-conf))
                   base (clojurify-storm-base (.stormBase storm-cluster-state storm-id nil))
                   launch-time-secs (if base (:launch-time-secs base)
                                      (throw
@@ -1490,7 +1486,7 @@
     (defgauge nimbus:num-supervisors
       (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
 
-    (start-metrics-reporters conf)
+    (StormCommon/startMetricsReporters conf)
 
     (reify Nimbus$Iface
       (^void submitTopologyWithOpts
@@ -1546,7 +1542,7 @@
               (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf))))
             (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user))))
               (throw (AuthorizationException. "Could not determine the user to run this topology as.")))
-            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
+            (StormCommon/systemTopology total-storm-conf topology) ;; this validates the structure of the topology
             (validate-topology-size topo-conf conf topology)
             (when (and (Utils/isZkAuthenticationConfiguredStormServer conf)
                        (not (Utils/isZkAuthenticationConfiguredTopology storm-conf)))
@@ -1599,7 +1595,7 @@
             (notify-topology-action-listener nimbus storm-name operation))
           (if (topology-conf TOPOLOGY-BACKPRESSURE-ENABLE)
             (.removeBackpressure (:storm-cluster-state nimbus) storm-id))
-          (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
+          (add-topology-to-history-log (StormCommon/getStormId (:storm-cluster-state nimbus) storm-name)
             nimbus topology-conf)))
 
       (^void rebalance [this ^String storm-name ^RebalanceOptions options]
@@ -1642,7 +1638,7 @@
       (debug [this storm-name component-id enable? samplingPct]
         (mark! nimbus:num-debug-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
-              storm-id (get-storm-id storm-cluster-state storm-name)
+              storm-id (StormCommon/getStormId storm-cluster-state storm-name)
               topology-conf (try-read-storm-conf conf storm-id blob-store)
               ;; make sure samplingPct is within bounds.
               spct (Math/max (Math/min samplingPct 100.0) 0.0)
@@ -1721,7 +1717,7 @@
       (uploadNewCredentials [this storm-name credentials]
         (mark! nimbus:num-uploadNewCredentials-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
-              storm-id (get-storm-id storm-cluster-state storm-name)
+              storm-id (StormCommon/getStormId storm-cluster-state storm-name)
               topology-conf (try-read-storm-conf conf storm-id blob-store)
               creds (when credentials (.get_creds credentials))]
           (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials")
@@ -1815,7 +1811,7 @@
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopology")
-              (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
+              (StormCommon/systemTopology topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
 
       (^StormTopology getUserTopology [this ^String id]
         (mark! nimbus:num-getUserTopology-calls)
@@ -1863,7 +1859,7 @@
                                                      (:storm-name base)
                                                      (->> (:executor->node+port assignment)
                                                        keys
-                                                       (mapcat executor-id->tasks)
+                                                       (mapcat #(clojurify-structure (StormCommon/executorIdToTasks %)))
                                                        count)
                                                      (->> (:executor->node+port assignment)
                                                        keys
@@ -2187,7 +2183,7 @@
           ;; Add the event logger details.
           (let [component->tasks (clojurify-structure (Utils/reverseMap (:task->component info)))
                 eventlogger-tasks (sort (get component->tasks
-                                             EVENTLOGGER-COMPONENT-ID))
+                                             StormCommon/EVENTLOGGER_COMPONENT_ID))
                 ;; Find the task the events from this component route to.
                 task-index (mod (TupleUtils/listHashCode [component-id])
                                 (count eventlogger-tasks))
@@ -2204,7 +2200,6 @@
 
       (^TopologyHistoryInfo getTopologyHistory [this ^String user]
         (let [storm-cluster-state (:storm-cluster-state nimbus)
-              bases (topology-bases storm-cluster-state)
               assigned-topology-ids (.assignments storm-cluster-state nil)
               user-group-match-fn (fn [topo-id user conf]
                                     (let [topology-conf (try-read-storm-conf conf topo-id (:blob-store nimbus))
@@ -2230,7 +2225,7 @@
         (when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus)))
         (log-message "Shut down master"))
       DaemonCommon
-      (waiting? [this]
+      (isWaiting [this]
         (.isTimerWaiting (:timer nimbus))))))
 
 (defn validate-port-available[conf]
@@ -2242,7 +2237,7 @@
       (System/exit 0))))
 
 (defn launch-server! [conf nimbus]
-  (validate-distributed-mode! conf)
+  (StormCommon/validateDistributedMode conf)
   (validate-port-available conf)
   (let [service-handler (service-handler conf nimbus)
         server (ThriftServer. conf (Nimbus$Processor. service-handler)

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 7295679..20cf7f2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -18,7 +18,7 @@
   (:import [org.apache.storm.scheduler ISupervisor]
            [org.apache.storm.utils LocalState Time Utils Utils$ExitCodeCallable
                                    ConfigUtils]
-           [org.apache.storm.daemon Shutdownable]
+           [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon]
            [org.apache.storm Constants]
            [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils IStateStorage]
            [java.net JarURLConnection]
@@ -35,7 +35,6 @@
   (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm.command HealthCheck])
   (:require [org.apache.storm.daemon [worker :as worker]]
-
             [clojure.set :as set])
   (:import [org.apache.thrift.transport TTransportException])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
@@ -956,7 +955,7 @@
            (shutdown-worker supervisor id)
            )))
      DaemonCommon
-     (waiting? [this]
+     (isWaiting [this]
        (or (not @(:active supervisor))
            (and
             (.isTimerWaiting (:heartbeat-timer supervisor))
@@ -1319,11 +1318,11 @@
   [supervisor]
   (log-message "Starting supervisor for storm version '" STORM-VERSION "'")
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
-    (validate-distributed-mode! conf)
+    (StormCommon/validateDistributedMode conf)
     (let [supervisor (mk-supervisor conf nil supervisor)]
       (Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown supervisor)))
     (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
-    (start-metrics-reporters conf)))
+    (StormCommon/startMetricsReporters conf)))
 
 (defn standalone-supervisor []
   (let [conf-atom (atom nil)

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 77abdec..f6c536d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -27,7 +27,8 @@
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
   (:import [java.util Collection List ArrayList])
-  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm Thrift]
+           (org.apache.storm.daemon StormCommon))
   (:require [org.apache.storm
              [stats :as stats]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
@@ -186,6 +187,6 @@
       (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
     ;; when this is called, the threads for the executor haven't been started yet,
     ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
-    (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])
+    (send-unanchored task-data StormCommon/SYSTEM_STREAM_ID ["startup"])
     task-data
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 92ba807..e1b0185 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -32,7 +32,7 @@
   (:import [org.apache.storm.grouping LoadMapping])
   (:import [org.apache.storm.messaging TransportFactory])
   (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback])
-  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
   (:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat])
   (:import [org.apache.storm.tuple AddressedTuple Fields])
@@ -254,6 +254,9 @@
         (log-error e "Error when processing event")
         (Utils/exitProcess 20 "Error when processing an event")))))
 
+(defn executor->tasks [executor-id]
+  clojurify-structure (StormCommon/executorIdToTasks executor-id))
+
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf state-store storm-cluster-state]
   (let [assignment-versions (atom {})
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
@@ -265,7 +268,7 @@
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
 
         receive-queue-map (->> executor-receive-queue-map
-                               (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
+                               (mapcat (fn [[e queue]] (for [t (executor->tasks e)] [t queue])))
                                (into {}))
 
         topology (ConfigUtils/readSupervisorTopology conf storm-id)
@@ -293,7 +296,7 @@
       :task-ids (->> receive-queue-map keys (map int) sort)
       :storm-conf storm-conf
       :topology topology
-      :system-topology (system-topology! storm-conf topology)
+      :system-topology (StormCommon/systemTopology storm-conf topology)
       :heartbeat-timer (mk-halting-timer "heartbeat-timer")
       :refresh-load-timer (mk-halting-timer "refresh-load-timer")
       :refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
@@ -302,7 +305,7 @@
       :refresh-active-timer (mk-halting-timer "refresh-active-timer")
       :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
       :user-timer (mk-halting-timer "user-timer")
-      :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
+      :task->component (StormCommon/stormTaskInfo topology storm-conf) ; for optimized access when used in tasks later on
       :component->stream->fields (component->stream->fields (:system-topology <>))
       ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
       :component->sorted-tasks (->> (:task->component <>) (Utils/reverseMap) (clojurify-structure) (map-val sort))
@@ -314,7 +317,7 @@
       ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
       :short-executor-receive-queue-map (map-key first executor-receive-queue-map)
       :task->short-executor (->> executors
-                                 (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
+                                 (mapcat (fn [e] (for [t (executor->tasks e)] [t (first e)])))
                                  (into {})
                                  (HashMap.))
       :suicide-fn (mk-suicide-fn conf)
@@ -378,6 +381,11 @@
          ~@body
          (finally (.unlock wlock#))))))
 
+(defn task->node_port [executor->node_port]
+  (let [executor->nodeport (thriftify-executor->node_port executor->node_port)]
+    (clojurify-task->node_port (StormCommon/taskToNodeport executor->nodeport)))
+  )
+
 ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn mk-refresh-connections [worker]
   (let [outbound-tasks (worker-outbound-tasks worker)
@@ -399,7 +407,7 @@
                               (:data new-assignment)))
               my-assignment (-> assignment
                                 :executor->node+port
-                                to-task->node+port
+                                task->node_port
                                 (select-keys outbound-tasks)
                                 ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
                                 (#(map-val endpoint->string %)))
@@ -740,7 +748,7 @@
               [this]
               (shutdown*))
              DaemonCommon
-             (waiting? [this]
+             (isWaiting [this]
                (and
                  (.isTimerWaiting (:heartbeat-timer worker))
                  (.isTimerWaiting (:refresh-connections-timer worker))
@@ -810,6 +818,6 @@
 (defn -main [storm-id assignment-id port-str worker-id]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
     (Utils/setupDefaultUncaughtExceptionHandler)
-    (validate-distributed-mode! conf)
+    (StormCommon/validateDistributedMode conf)
     (let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)]
       (Utils/addShutdownHookWithForceKillIn1Sec #(.shutdown worker)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 66fc051..bda09ee 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -29,7 +29,7 @@
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
   (:import [java.util.concurrent ConcurrentHashMap])
-  (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState])
+  (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState StormCommonInstaller])
   (:import [org.apache.storm.tuple Fields Tuple TupleImpl])
   (:import [org.apache.storm.task TopologyContext])
   (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
@@ -49,7 +49,8 @@
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.task TopologyContext]
            (org.apache.storm.messaging IContext)
-           [org.json.simple JSONValue])
+           [org.json.simple JSONValue]
+           (org.apache.storm.daemon StormCommon Acker DaemonCommon))
   (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
   (:use [org.apache.storm util config log local-state-converter converter])
   (:use [org.apache.storm.internal thrift]))
@@ -285,13 +286,13 @@
   ([cluster-map timeout-ms]
   ;; wait until all workers, supervisors, and nimbus is waiting
   (let [supervisors @(:supervisors cluster-map)
-        workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
+        workers (filter (partial instance? DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
         daemons (concat
                   [(:nimbus cluster-map)]
                   supervisors
                   ; because a worker may already be dead
                   workers)]
-    (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
+    (while-timeout timeout-ms (not (every? (memfn isWaiting) daemons))
                    (Thread/sleep (rand-int 20))
                    ;;      (doseq [d daemons]
                    ;;        (if-not ((memfn waiting?) d)
@@ -352,7 +353,7 @@
 
 (defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources]
   (fn [existing-assignments]
-    (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+    (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
           existing-assignments (into {} (for [[tid assignment] existing-assignments]
                                           {tid (:worker->resources assignment)}))
           new-assignments (assoc existing-assignments topology-id worker->resources)]
@@ -360,7 +361,7 @@
 
 (defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port]
   (fn [new-scheduler-assignments existing-assignments]
-    (let [topology-id (common/get-storm-id storm-cluster-state storm-name)
+    (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
           existing-assignments (into {} (for [[tid assignment] existing-assignments]
                                           {tid (:executor->node+port assignment)}))
           new-assignments (assoc existing-assignments topology-id executor->node+port)]
@@ -372,17 +373,19 @@
 
 (defn submit-mocked-assignment
   [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources]
-  (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
-                   nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
-                   nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
-                                                          storm-cluster-state
-                                                          storm-name
-                                                          worker->resources)
-                   nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
-                                                                      storm-cluster-state
-                                                                      storm-name
-                                                                      executor->node+port)]
-    (submit-local-topology nimbus storm-name conf topology)))
+  (let [fake-common (proxy [StormCommon] []
+                      (stormTaskInfoImpl [_] task->component))]
+    (with-open [- (StormCommonInstaller. fake-common)]
+      (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments)
+                       nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources
+                                                              storm-cluster-state
+                                                              storm-name
+                                                              worker->resources)
+                       nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
+                                                                          storm-cluster-state
+                                                                          storm-name
+                                                                          executor->node+port)]
+        (submit-local-topology nimbus storm-name conf topology)))))
 
 (defn mk-capture-launch-fn [capture-atom]
   (fn [supervisor storm-id port worker-id mem-onheap]
@@ -437,9 +440,9 @@
   [cluster-map storm-name stat-key :component-ids nil]
   (let [state (:storm-cluster-state cluster-map)
         nimbus (:nimbus cluster-map)
-        storm-id (common/get-storm-id state storm-name)
+        storm-id (StormCommon/getStormId state storm-name)
         component->tasks (clojurify-structure (Utils/reverseMap
-                           (common/storm-task-info
+                           (StormCommon/stormTaskInfo
                              (.getUserTopology nimbus storm-id)
                              (->>
                                (.getTopologyConf nimbus storm-id)
@@ -590,7 +593,7 @@
     (submit-local-topology (:nimbus cluster-map) storm-name storm-conf topology)
     (advance-cluster-time cluster-map 11)
 
-    (let [storm-id (common/get-storm-id state storm-name)]
+    (let [storm-id (StormCommon/getStormId state storm-name)]
       ;;Give the topology time to come up without using it to wait for the spouts to complete
       (simulate-wait cluster-map)
 
@@ -667,34 +670,35 @@
 
 (defmacro with-tracked-cluster
   [[cluster-sym & cluster-args] & body]
-  `(let [id# (Utils/uuid)]
-     (RegisteredGlobalState/setState
-       id#
-       (doto (ConcurrentHashMap.)
-         (.put "spout-emitted" (AtomicInteger. 0))
-         (.put "transferred" (AtomicInteger. 0))
-         (.put "processed" (AtomicInteger. 0))))
-     (with-var-roots
-       [common/mk-acker-bolt
-        (let [old# common/mk-acker-bolt]
-         (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
-        ;; critical that this particular function is overridden here,
-        ;; since the transferred stat needs to be incremented at the moment
-        ;; of tuple emission (and not on a separate thread later) for
-        ;; topologies to be tracked correctly. This is because "transferred" *must*
-        ;; be incremented before "processing".
-        executor/mk-executor-transfer-fn
-        (let [old# executor/mk-executor-transfer-fn]
-          (fn [& args#]
-            (let [transferrer# (apply old# args#)]
-              (fn [& args2#]
-                ;; (log-message "Transferring: " transfer-args#)
-                (increment-global! id# "transferred" 1)
-                (apply transferrer# args2#)))))]
-       (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
-                           (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
-                             ~@body)))
-     (RegisteredGlobalState/clearState id#)))
+  `(let [id# (Utils/uuid)
+         fake-common# (proxy [StormCommon] []
+                        (makeAckerBoltImpl [] (let [tracker-acker# (NonRichBoltTracker. (Acker.) (String. id#))]
+                                                tracker-acker#)))]
+    (with-open [-# (StormCommonInstaller. fake-common#)]
+      (RegisteredGlobalState/setState
+        id#
+        (doto (ConcurrentHashMap.)
+          (.put "spout-emitted" (AtomicInteger. 0))
+          (.put "transferred" (AtomicInteger. 0))
+          (.put "processed" (AtomicInteger. 0))))
+      (with-var-roots
+        [;; critical that this particular function is overridden here,
+         ;; since the transferred stat needs to be incremented at the moment
+         ;; of tuple emission (and not on a separate thread later) for
+         ;; topologies to be tracked correctly. This is because "transferred" *must*
+         ;; be incremented before "processing".
+         executor/mk-executor-transfer-fn
+         (let [old# executor/mk-executor-transfer-fn]
+           (fn [& args#]
+             (let [transferrer# (apply old# args#)]
+               (fn [& args2#]
+                 ;; (log-message "Transferring: " transfer-args#)
+                 (increment-global! id# "transferred" 1)
+                 (apply transferrer# args2#)))))]
+          (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
+                              (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+                                ~@body)))
+      (RegisteredGlobalState/clearState id#))))
 
 (defn tracked-wait
   "Waits until topology is idle and 'amt' more tuples have been emitted by spouts."

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 143ab14..d24fc14 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -23,9 +23,6 @@
   (:use [hiccup core page-helpers])
   (:use [org.apache.storm config util log stats converter])
   (:use [org.apache.storm.ui helpers])
-  (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
-                                              ACKER-FAIL-STREAM-ID mk-authorization-handler
-                                              start-metrics-reporters]]])
   (:import [org.apache.storm.utils Time]
            [org.apache.storm.generated NimbusSummary]
            [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration])
@@ -53,13 +50,14 @@
             [org.apache.storm.internal [thrift :as thrift]])
   (:require [metrics.meters :refer [defmeter mark!]])
   (:import [org.apache.commons.lang StringEscapeUtils])
-  (:import [org.apache.logging.log4j Level])
+  (:import [org.apache.logging.log4j Level]
+           (org.apache.storm.daemon StormCommon))
   (:import [org.eclipse.jetty.server Server])
   (:gen-class))
 
 (def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
-(def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
-(def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
+(def ^:dynamic *UI-ACL-HANDLER* (StormCommon/mkAuthorizationHandler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
+(def ^:dynamic *UI-IMPERSONATION-HANDLER* (StormCommon/mkAuthorizationHandler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
 (def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
 (def STORM-VERSION (VersionInfo/getVersion))
 
@@ -116,9 +114,9 @@
 (defn is-ack-stream
   [stream]
   (let [acker-streams
-        [ACKER-INIT-STREAM-ID
-         ACKER-ACK-STREAM-ID
-         ACKER-FAIL-STREAM-ID]]
+        [StormCommon/ACKER_INIT_STREAM_ID
+         StormCommon/ACKER_ACK_STREAM_ID
+         StormCommon/ACKER_FAIL_STREAM_ID]]
     (every? #(not= %1 stream) acker-streams)))
 
 (defn spout-summary?
@@ -1270,7 +1268,7 @@
           https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
           https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
           https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
-      (start-metrics-reporters conf)
+      (StormCommon/startMetricsReporters conf)
       (UIHelpers/stormRunJetty  (int (conf UI-PORT))
                                 (conf UI-HOST)
                                 https-port

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
new file mode 100644
index 0000000..d1b71a7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface DaemonCommon {
+    public boolean isWaiting();
+}


Mime
View raw message