storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [50/50] [abbrv] git commit: Merge branch 'master' into security-upmerge
Date Wed, 11 Jun 2014 16:10:22 GMT
Merge branch 'master' into security-upmerge

Conflicts:
	.gitignore
	storm-core/src/clj/backtype/storm/daemon/drpc.clj
	storm-core/src/clj/backtype/storm/daemon/executor.clj
	storm-core/src/clj/backtype/storm/daemon/logviewer.clj
	storm-core/src/clj/backtype/storm/daemon/worker.clj
	storm-core/src/clj/backtype/storm/timer.clj
	storm-core/src/clj/backtype/storm/ui/core.clj
	storm-core/src/clj/backtype/storm/ui/helpers.clj
	storm-core/src/clj/backtype/storm/util.clj
	storm-core/src/jvm/backtype/storm/Config.java
	storm-core/src/jvm/backtype/storm/utils/Utils.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0a98bee2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0a98bee2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0a98bee2

Branch: refs/heads/security
Commit: 0a98bee214b46ed20b566a9b49c3eca2895f9fd5
Parents: 6592b82 c89fb82
Author: Robert (Bobby) Evans <bobby@apache.org>
Authored: Wed Jun 11 11:07:59 2014 -0500
Committer: Robert (Bobby) Evans <bobby@apache.org>
Committed: Wed Jun 11 11:07:59 2014 -0500

----------------------------------------------------------------------
 .gitignore                                      |    2 +
 CHANGELOG.md                                    |    8 +
 README.markdown                                 |    1 +
 conf/defaults.yaml                              |    9 +
 .../src/clj/backtype/storm/daemon/drpc.clj      |   27 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   43 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   70 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   10 +-
 .../src/clj/backtype/storm/messaging/loader.clj |   81 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   20 +-
 storm-core/src/clj/backtype/storm/timer.clj     |   63 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   | 1367 +++++++-----------
 storm-core/src/clj/backtype/storm/util.clj      |    3 +
 storm-core/src/jvm/backtype/storm/Config.java   |   19 +
 .../backtype/storm/messaging/IConnection.java   |   15 +-
 .../backtype/storm/messaging/netty/Client.java  |  388 ++---
 .../backtype/storm/messaging/netty/Context.java |   41 +-
 .../storm/messaging/netty/ControlMessage.java   |    6 +-
 .../storm/messaging/netty/MessageBatch.java     |    5 +
 .../storm/messaging/netty/MessageDecoder.java   |  108 +-
 .../netty/NettyRenameThreadFactory.java         |   35 +
 .../backtype/storm/messaging/netty/Server.java  |  145 +-
 .../netty/StormClientErrorHandler.java          |   41 +
 .../messaging/netty/StormClientHandler.java     |   87 --
 .../netty/StormClientPipelineFactory.java       |    2 +-
 .../messaging/netty/StormServerHandler.java     |   40 +-
 .../storm/testing/TestEventLogSpout.java        |  139 ++
 .../storm/testing/TestEventOrderCheckBolt.java  |   76 +
 .../backtype/storm/utils/DisruptorQueue.java    |    9 +-
 .../backtype/storm/utils/TransferDrainer.java   |  113 ++
 .../src/jvm/backtype/storm/utils/Utils.java     |   48 +-
 storm-core/src/ui/public/component.html         |   88 ++
 storm-core/src/ui/public/index.html             |   73 +
 storm-core/src/ui/public/js/arbor-graphics.js   |   51 +
 storm-core/src/ui/public/js/arbor-tween.js      |   86 ++
 storm-core/src/ui/public/js/arbor.js            |   67 +
 storm-core/src/ui/public/js/jquery.mustache.js  |  592 ++++++++
 storm-core/src/ui/public/js/purl.js             |  267 ++++
 storm-core/src/ui/public/js/script.js           |   52 +-
 storm-core/src/ui/public/js/visualization.js    |  403 ++++++
 .../templates/component-page-template.html      |  152 ++
 .../public/templates/index-page-template.html   |   74 +
 .../public/templates/json-error-template.html   |    4 +
 .../templates/topology-page-template.html       |  148 ++
 storm-core/src/ui/public/topology.html          |   87 ++
 .../test/clj/backtype/storm/drpc_test.clj       |   29 +-
 .../storm/messaging/netty_unit_test.clj         |   46 +-
 .../test/clj/backtype/storm/messaging_test.clj  |   35 +-
 .../test/clj/backtype/storm/metrics_test.clj    |   24 +
 storm-core/test/clj/backtype/storm/ui_test.clj  |   14 +-
 50 files changed, 3976 insertions(+), 1337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index d482065,39c4b92..7f17054
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -125,12 -92,13 +125,15 @@@ supervisor.supervisors.commands: [
  
  ### worker.* configs are for task workers
  worker.childopts: "-Xmx768m"
 +worker.gc.childopts: ""
  worker.heartbeat.frequency.secs: 1
  
+ # control how many worker receiver threads we need per worker 
+ topology.worker.receiver.thread.count: 1
+ 
  task.heartbeat.frequency.secs: 3
  task.refresh.poll.secs: 10
 +task.credentials.poll.secs: 30
  
  zmq.threads: 1
  zmq.linger.millis: 5000

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/clj/backtype/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/drpc.clj
index a7d3c32,03d8686..1548347
--- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj
@@@ -95,9 -77,10 +98,11 @@@
                req (DRPCRequest. args id)
                ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)
                ]
-           (swap! id->func assoc id function)
++          (swap! id->function assoc id function)
            (swap! id->start assoc id (current-time-secs))
            (swap! id->sem assoc id sem)
+           (swap! id->function assoc id function)
+           (swap! id->request assoc id req)
            (.add queue req)
            (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
            (.acquire sem)
@@@ -111,30 -96,19 +118,30 @@@
                ))))
        DistributedRPCInvocations$Iface
        (^void result [this ^String id ^String result]
-         (when-let [func (@id->func id)]
 -        (let [^Semaphore sem (@id->sem id)]
 -          (log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
 -          (when sem
 -            (swap! id->result assoc id result)
 -            (.release sem)
 -            )))
++        (when-let [func (@id->function id)]
 +          (check-authorization drpc-acl-handler
 +                               {DRPCAuthorizerBase/FUNCTION_NAME func}
 +                               "result")
 +          (let [^Semaphore sem (@id->sem id)]
 +            (log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
 +            (when sem
 +              (swap! id->result assoc id result)
 +              (.release sem)
 +              ))))
        (^void failRequest [this ^String id]
-         (when-let [func (@id->func id)]
 -        (let [^Semaphore sem (@id->sem id)]
 -          (when sem
 -            (swap! id->result assoc id (DRPCExecutionException. "Request failed"))
 -            (.release sem)
 -            )))
++        (when-let [func (@id->function id)]
 +          (check-authorization drpc-acl-handler
 +                               {DRPCAuthorizerBase/FUNCTION_NAME func}
 +                               "failRequest")
 +          (let [^Semaphore sem (@id->sem id)]
 +            (when sem
 +              (swap! id->result assoc id (DRPCExecutionException. "Request failed"))
 +              (.release sem)
 +              ))))
        (^DRPCRequest fetchRequest [this ^String func]
 +        (check-authorization drpc-acl-handler
 +                             {DRPCAuthorizerBase/FUNCTION_NAME func}
 +                             "fetchRequest")
          (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
                ret (.poll queue)]
            (if ret

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/executor.clj
index 29cce08,1bbe53d..bc491d9
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@@ -447,12 -433,7 +448,12 @@@
                            (let [stream-id (.getSourceStreamId tuple)]
                              (condp = stream-id
                                Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
-                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
 +                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
 +                                (let [task-data (get task-datas task-id)
 +                                      spout-obj (:object task-data)]
 +                                  (when (instance? ICredentialsListener spout-obj)
 +                                    (.setCredentials spout-obj (.getValue tuple 0))))
                                (let [id (.getValue tuple 0)
                                      [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
                                  (when spout-id
@@@ -638,12 -617,7 +639,12 @@@
                            ;; need to do it this way to avoid reflection
                            (let [stream-id (.getSourceStreamId tuple)]
                              (condp = stream-id
 +                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
 +                                (let [task-data (get task-datas task-id)
 +                                      bolt-obj (:object task-data)]
 +                                  (when (instance? ICredentialsListener bolt-obj)
 +                                    (.setCredentials bolt-obj (.getValue tuple 0))))
-                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
                                (let [task-data (get task-datas task-id)
                                      ^IBolt bolt-obj (:object task-data)
                                      user-context (:user-context task-data)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index 265ed4b,16765d9..d09ea35
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -18,11 -18,10 +18,13 @@@
    (:use [backtype.storm bootstrap])
    (:require [backtype.storm.daemon [executor :as executor]])
    (:import [java.util.concurrent Executors])
+   (:import [java.util ArrayList HashMap])
+   (:import [backtype.storm.utils TransferDrainer])
    (:import [backtype.storm.messaging TransportFactory])
-   (:import [backtype.storm.messaging IContext IConnection])
+   (:import [backtype.storm.messaging TaskMessage IContext IConnection])
 +  (:import [backtype.storm.security.auth AuthUtils])
 +  (:import [javax.security.auth Subject])
 +  (:import [java.security PrivilegedExceptionAction])
    (:gen-class))
  
  (bootstrap)
@@@ -118,28 -112,25 +120,33 @@@
    (let [local-tasks (-> worker :task-ids set)
          local-transfer (:transfer-local-fn worker)
          ^DisruptorQueue transfer-queue (:transfer-queue worker)
 -        task->node+port (:cached-task->node+port worker)]
 -    (fn [^KryoTupleSerializer serializer tuple-batch]
 -      (let [local (ArrayList.)
 -            remoteMap (HashMap.)]
 -        (fast-list-iter [[task tuple :as pair] tuple-batch]
 -          (if (local-tasks task)
 -            (.add local pair)
 -            
 -            ;;Using java objects directly to avoid performance issues in java code
 -            (let [node+port (get @task->node+port task)]
 -              (when (not (.get remoteMap node+port))
 -                (.put remoteMap node+port (ArrayList.)))
 -              (let [remote (.get remoteMap node+port)]
 -                (.add remote (TaskMessage. task (.serialize serializer tuple)))
 -                 ))))
 -        
 -        (local-transfer local)
 -        (disruptor/publish transfer-queue remoteMap)
 -          ))))
++        task->node+port (:cached-task->node+port worker)
 +        try-serialize-local ((:conf worker) TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE)
 +        transfer-fn
 +          (fn [^KryoTupleSerializer serializer tuple-batch]
 +            (let [local (ArrayList.)
-                   remote (ArrayList.)]
++                  remoteMap (HashMap.)]
 +              (fast-list-iter [[task tuple :as pair] tuple-batch]
 +                (if (local-tasks task)
-                   (.add local pair)
-                   (.add remote pair)
-               ))
-               (local-transfer local)
-               ;; not using map because the lazy seq shows up in perf profiles
-               (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]
-                 (disruptor/publish transfer-queue serialized-pairs)
-               )))]
++                  (.add local pair) 
++
++                  ;;Using java objects directly to avoid performance issues in java code
++                  (let [node+port (get @task->node+port task)]
++                    (when (not (.get remoteMap node+port))
++                      (.put remoteMap node+port (ArrayList.)))
++                    (let [remote (.get remoteMap node+port)]
++                      (.add remote (TaskMessage. task (.serialize serializer tuple)))
++                     )))) 
++                (local-transfer local)
++                (disruptor/publish transfer-queue remoteMap)
++              ))]
 +    (if try-serialize-local
 +      (do 
 +        (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
 +        (fn [^KryoTupleSerializer serializer tuple-batch]
 +          (assert-can-serialize serializer tuple-batch)
 +          (transfer-fn serializer tuple-batch)))
 +      transfer-fn)))
  
  (defn- mk-receive-queue-map [storm-conf executors]
    (->> executors
@@@ -179,19 -171,24 +187,20 @@@
    (mk-timer :kill-fn (fn [t]
                         (log-error t "Error when processing event")
                         (halt-process! 20 "Error when processing an event")
-                        )))
+                        )
+             :timer-name timer-name))
  
 -(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
 -  (let [cluster-state (cluster/mk-distributed-cluster-state conf)
 -        storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
 -        storm-conf (read-supervisor-storm-conf conf storm-id)
 -        executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
 -        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
 -                                                  :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
 -        executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
 -        
 -        receive-queue-map (->> executor-receive-queue-map
 -                               (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
 -                               (into {}))
 -
 -        topology (read-supervisor-topology conf storm-id)]
 -    (recursive-map
 +(defn recursive-map-worker-data [conf mq-context storm-id assignment-id port
 +                                  storm-conf
 +                                  worker-id 
 +                                  cluster-state 
 +                                  storm-cluster-state
 +                                  executors 
 +                                  transfer-queue
 +                                  executor-receive-queue-map 
 +                                  receive-queue-map
 +                                  topology]
 +  (recursive-map
        :conf conf
        :mq-context (if mq-context
                        mq-context
@@@ -208,12 -205,11 +217,12 @@@
        :storm-conf storm-conf
        :topology topology
        :system-topology (system-topology! storm-conf topology)
-       :heartbeat-timer (mk-halting-timer)
-       :refresh-connections-timer (mk-halting-timer)
-       :refresh-credentials-timer (mk-halting-timer)
-       :refresh-active-timer (mk-halting-timer)
-       :executor-heartbeat-timer (mk-halting-timer)
-       :user-timer (mk-halting-timer)
+       :heartbeat-timer (mk-halting-timer "heartbeat-timer")
+       :refresh-connections-timer (mk-halting-timer "refresh-connections-timer")
++      :refresh-credentials-timer (mk-halting-timer "refresh-credentials-timer")
+       :refresh-active-timer (mk-halting-timer "refresh-active-timer")
+       :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer")
+       :user-timer (mk-halting-timer "user-timer")
        :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
        :component->stream->fields (component->stream->fields (:system-topology <>))
        :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
@@@ -232,32 -228,9 +241,33 @@@
        :default-shared-resources (mk-default-resources <>)
        :user-shared-resources (mk-user-resources <>)
        :transfer-local-fn (mk-transfer-local-fn <>)
+       :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
        :transfer-fn (mk-transfer-fn <>)
 -      )))
 +      ))
 +
 +(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
 +  (let [executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
-         transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
++        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
 +                                                  :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
 +        executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
 +        
 +        receive-queue-map (->> executor-receive-queue-map
 +                               (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
 +                               (into {}))
 +
 +        topology (read-supervisor-topology conf storm-id)]
 +        (recursive-map-worker-data 
 +          conf mq-context storm-id assignment-id port
 +          storm-conf
 +          worker-id 
 +          cluster-state 
 +          storm-cluster-state
 +          executors 
 +          transfer-queue
 +          executor-receive-queue-map 
 +          receive-queue-map
 +          topology)
 +  ))
  
  (defn- endpoint->string [[node port]]
    (str port "/" node))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index 6d7cd44,5f2bcba..216c304
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -19,18 -19,17 +19,18 @@@
    (:use [hiccup core page-helpers])
    (:use [backtype.storm config util log])
    (:use [backtype.storm.ui helpers])
-   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]])
+   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID
+                                               ACKER-ACK-STREAM-ID ACKER-FAIL-STREAM-ID system-id?]]])
    (:use [ring.adapter.jetty :only [run-jetty]])
 -  (:use [clojure.string :only [trim]])
 +  (:use [clojure.string :only [blank? lower-case trim]])
    (:import [backtype.storm.utils Utils])
    (:import [backtype.storm.generated ExecutorSpecificStats
              ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
              ErrorInfo ClusterSummary SupervisorSummary TopologySummary
              Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
              KillOptions])
 +  (:import [backtype.storm.security.auth AuthUtils])
-   (:import [java.io File PrintWriter StringWriter])
-   (:import [java.net URLDecoder])
+   (:import [java.io File])
    (:require [compojure.route :as route]
              [compojure.handler :as handler]
              [ring.util.response :as resp]
@@@ -45,6 -44,6 +45,30 @@@
       ~@body
       ))
  
++(defn authorized-ui-user? [user conf topology-conf]
++  (let [ui-users (concat (conf UI-USERS)
++                         (conf NIMBUS-ADMINS)
++                         (topology-conf UI-USERS)
++                         (topology-conf TOPOLOGY-USERS))]
++    (or (blank? (conf UI-FILTER))
++        (and (not (blank? user))
++          (some #(= % user) ui-users)))))
++
++(defn assert-authorized-ui-user [user conf topology-conf]
++  (if (not (authorized-ui-user? user conf topology-conf))
++    ;;TODO need a better exception here so the UI can appear better
++    (throw (RuntimeException. (str "User " user " is not authorized.")))))
++
++(defn- ui-actions-enabled? []
++  (= "true" (lower-case (*STORM-CONF* UI-ACTIONS-ENABLED))))
++
++(defn assert-authorized-topology-user [user]
++  ;;TODO eventually we will want to use the Authorizatin handler from nimbus, but for now
++  ;; Disable the calls conditionally
++  (if (not (ui-actions-enabled?))
++    ;;TODO need a better exception here so the UI can appear better
++    (throw (RuntimeException. (str "Topology actions for the UI have been disabled")))))
++
  (defn get-filled-stats [summs]
    (->> summs
         (map #(.get_stats ^ExecutorSummary %))
@@@ -534,27 -239,23 +264,24 @@@
                     reverse
                     first)]
      (if error
-       [:span (if (< (time-delta (.get_error_time_secs ^ErrorInfo error))
-                     (* 60 30))
-                {:class "red"}
-                {})
-        (error-subset (.get_error ^ErrorInfo error))]
-       )))
+       (error-subset (.get_error ^ErrorInfo error))
+       "")))
  
- (defn component-link [storm-id id]
-   (link-to (url-format "/topology/%s/component/%s" storm-id id) (escape-html id)))
+ (defn component-task-summs [^TopologyInfo summ topology id]
+   (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+         bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+         spout-comp-summs (group-by-comp spout-summs)
+         bolt-comp-summs (group-by-comp bolt-summs)
+         ret (if (contains? spout-comp-summs id)
+               (spout-comp-summs id)
+               (bolt-comp-summs id))]
+     (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)
+     ))
  
 -(defn worker-log-link [host port]
 -  (url-format "http://%s:%s/log?file=worker-%s.log"
 -              host (*STORM-CONF* LOGVIEWER-PORT) port))
 +(defn worker-log-link [host port topology-id]
 +  (let [fname (logs-filename topology-id port)]
-     (link-to (url-format (str "http://%s:%s/log?file=%s")
-                 host (*STORM-CONF* LOGVIEWER-PORT) fname) (str port))))
- 
- (defn render-capacity [capacity]
-   (let [capacity (nil-to-zero capacity)]
-     [:span (if (> capacity 0.9)
-                  {:class "red"}
-                  {})
-            (float-str capacity)]))
++    (url-format (str "http://%s:%s/log?file=%s")
++          host (*STORM-CONF* LOGVIEWER-PORT) fname)))
  
  (defn compute-executor-capacity [^ExecutorSummary e]
    (let [stats (.get_stats e)
@@@ -675,30 -329,239 +355,244 @@@
                           (StringEscapeUtils/escapeJavaScript name) "', '"
                           command "', " is-wait ", " default-wait ")")}])
  
- (defn- ui-actions-enabled? []
-   (= "true" (lower-case (*STORM-CONF* UI-ACTIONS-ENABLED))))
+ (defn sanitize-stream-name [name]
+   (let [sym-regex #"(?![A-Za-z_\-:\.])."]
+     (str
+      (if (re-find #"^[A-Za-z]" name)
+        (clojure.string/replace name sym-regex "_")
+        (clojure.string/replace (str \s name) sym-regex "_"))
+      (hash name))))
+ 
+ (defn sanitize-transferred [transferred]
+   (into {}
+         (for [[time, stream-map] transferred]
+           [time, (into {} 
+                        (for [[stream, trans] stream-map]
+                          [(sanitize-stream-name stream), trans]))])))
+ 
+ (defn visualization-data [spout-bolt spout-comp-summs bolt-comp-summs window storm-id]
+   (let [components (for [[id spec] spout-bolt]
+             [id 
+              (let [inputs (.get_inputs (.get_common spec))
+                    bolt-summs (get bolt-comp-summs id)
+                    spout-summs (get spout-comp-summs id)
+                    bolt-cap (if bolt-summs
+                               (compute-bolt-capacity bolt-summs)
+                               0)]
+                {
+                 :type                (if bolt-summs
+                                        "bolt"
+                                        "spout")
+ 
+                 :capacity            bolt-cap
+ 
+                 :latency             (if bolt-summs
+                                        (get-in (bolt-streams-stats bolt-summs true) [:process-latencies window])
+                                        (get-in (spout-streams-stats spout-summs true) [:complete-latencies window]))
+ 
+                 :transferred         (or
+                                       (get-in (spout-streams-stats spout-summs true) [:transferred window]) 
+                                       (get-in (bolt-streams-stats bolt-summs true) [:transferred window]))
+                 :stats               (let [mapfn (fn [dat]
+                                                    (map (fn [^ExecutorSummary summ]
+                                                           {:host (.get_host summ)
+                                                            :port (.get_port summ)
+                                                            :uptime_secs (.get_uptime_secs summ)
+                                                            :transferred (if-let [stats (.get_stats summ)]
+                                                                           (sanitize-transferred (.get_transferred stats)))})
+                                                         dat))]
+                                        (if bolt-summs
+                                          (mapfn bolt-summs)
+                                          (mapfn spout-summs)))
+ 
+                 :link                (url-format "/component.html?id=%s&topology_id=%s" id storm-id)
+ 
+                 :inputs              (for [[global-stream-id group] inputs]
+                                        {:component (.get_componentId global-stream-id) 
+                                         :stream (.get_streamId global-stream-id)
+                                         :sani-stream (sanitize-stream-name (.get_streamId global-stream-id))
+                                         :grouping (clojure.core/name (thrift/grouping-type group))})})])]
+     (into {} (doall components))))
+ 
+ (defn stream-boxes [datmap]
+   (let [filter-fn (mk-include-sys-fn true)
+         streams
+         (vec (doall (distinct
+                      (apply concat
+                             (for [[k v] datmap]
+                               (for [m (get v :inputs)]
+                                 { :stream (get m :stream) :sani-stream (get m :sani-stream) :checked (is-ack-stream (get m :stream))}))))))]
+     (map (fn [row] 
+            {:row row}) (partition 4 4 nil streams))))
+   
 -(defn mk-visualization-data [id window include-sys?]
++(defn mk-visualization-data [id window include-sys? user]
+   (with-nimbus nimbus
+     (let [window (if window window ":all-time")
+           topology (.getTopology ^Nimbus$Client nimbus id)
+           spouts (.get_spouts topology)
+           bolts (.get_bolts topology)
+           summ (.getTopologyInfo ^Nimbus$Client nimbus id)
+           spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
+           bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
+           spout-comp-summs (group-by-comp spout-summs)
+           bolt-comp-summs (group-by-comp bolt-summs)
+           bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) bolt-comp-summs)
+           topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus id))]
++      (assert-authorized-ui-user user *STORM-CONF* topology-conf)
+       (visualization-data 
+        (merge (hashmap-to-persistent spouts)
+               (hashmap-to-persistent bolts))
+        spout-comp-summs 
+        bolt-comp-summs 
+        window 
+        id))))
+ 
+ (defn cluster-configuration []
+   (with-nimbus nimbus
+     (.getNimbusConf ^Nimbus$Client nimbus)))
+ 
+ (defn cluster-summary
+   ([]
+      (with-nimbus nimbus
+         (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus))))
+   ([^ClusterSummary summ]
+      (let [sups (.get_supervisors summ)
+         used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
+         total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
+         free-slots (- total-slots used-slots)
+         total-tasks (->> (.get_topologies summ)
+                          (map #(.get_num_tasks ^TopologySummary %))
+                          (reduce +))
+         total-executors (->> (.get_topologies summ)
+                              (map #(.get_num_executors ^TopologySummary %))
+                              (reduce +))]
+        { "stormVersion" (read-storm-version)
+          "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
+          "supervisors" (count sups)
+          "slotsTotal" total-slots
+          "slotsUsed"  used-slots
+          "slotsFree" free-slots
+          "executorsTotal" total-executors
+          "tasksTotal" total-tasks })))
+ 
+ (defn supervisor-summary
+   ([]
+      (with-nimbus nimbus
+        (supervisor-summary (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))
+        ))
+   ([summs]
+      {"supervisors"
+       (for [^SupervisorSummary s summs]
+             {"id" (.get_supervisor_id s)
+              "host" (.get_host s)
+              "uptime" (pretty-uptime-sec (.get_uptime_secs s))
+              "slotsTotal" (.get_num_workers s)
+              "slotsUsed" (.get_num_used_workers s)})}))
+ 
+ (defn all-topologies-summary
+   ([]
+      (with-nimbus nimbus
+        (all-topologies-summary (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
+   ([summs]
+      {"topologies"
+       (for [^TopologySummary t summs]
+         {"id" (.get_id t)
++         "owner" (.get_owner t)
+          "name" (.get_name t)
+          "status" (.get_status t)
+          "uptime" (pretty-uptime-sec (.get_uptime_secs t))
+          "tasksTotal" (.get_num_tasks t)
+          "workersTotal" (.get_num_workers t)
 -         "executorsTotal" (.get_num_executors t)})
++         "executorsTotal" (.get_num_executors t)
++         "schedulerInfo" (.get_sched_status t)})
+       }))
+ 
+ (defn topology-stats [id window stats]
+   (let [times (stats-times (:emitted stats))
+         display-map (into {} (for [t times] [t pretty-uptime-sec]))
+         display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+     (for [k (concat times [":all-time"])
+            :let [disp ((display-map k) k)]]
+       { "windowPretty" disp
+         "window" k
+         "emitted" (get-in stats [:emitted k])
+         "transferred" (get-in stats [:transferred k])
+         "completeLatency" (float-str (get-in stats [:complete-latencies k]))
+         "acked" (get-in stats [:acked k])
+         "failed" (get-in stats [:failed k])
+         }
+       )))
  
- (defn- topology-actions [id name status msg-timeout]
-   (if (ui-actions-enabled?)
-     (concat
-        [[:h2 {:class "js-only"} "Topology actions"]]
-        [[:p {:class "js-only"} (concat
-          [(topology-action-button id name "Activate" "activate" false 0 (= "INACTIVE" status))]
-          [(topology-action-button id name "Deactivate" "deactivate" false 0 (= "ACTIVE" status))]
-          [(topology-action-button id name "Rebalance" "rebalance" true msg-timeout (or (= "ACTIVE" status) (= "INACTIVE" status)))]
-          [(topology-action-button id name "Kill" "kill" true msg-timeout (not= "KILLED" status))]
-        )]] )
-     []))
+ (defn spout-comp [top-id summ-map errors window include-sys?]
+   (for [[id summs] summ-map
+         :let [stats-seq (get-filled-stats summs)
+               stats (aggregate-spout-streams
+                      (aggregate-spout-stats
+                       stats-seq include-sys?))]]
+     {"spoutId" id
+      "executors" (count summs)
+      "tasks" (sum-tasks summs)
+      "emitted" (get-in stats [:emitted window])
+      "transferred" (get-in stats [:transferred window])
+      "completeLatency" (float-str (get-in stats [:complete-latencies window]))
+      "acked" (get-in stats [:acked window])
+      "failed" (get-in stats [:failed window])
+      "lastError" (most-recent-error (get errors id))
+      }))
+ 
+ (defn bolt-comp [top-id summ-map errors window include-sys?]
+   (for [[id summs] summ-map
+         :let [stats-seq (get-filled-stats summs)
+               stats (aggregate-bolt-streams
+                      (aggregate-bolt-stats
+                       stats-seq include-sys?))
+               ]]
+     {"boltId" id
+      "executors" (count summs)
+      "tasks" (sum-tasks summs)
+      "emitted" (get-in stats [:emitted window])
+      "transferred" (get-in stats [:transferred window])
+      "capacity" (float-str (nil-to-zero (compute-bolt-capacity summs)))
+      "executeLatency" (float-str (get-in stats [:execute-latencies window]))
+      "executed" (get-in stats [:executed window])
+      "processLatency" (float-str (get-in stats [:process-latencies window]))
+      "acked" (get-in stats [:acked window])
+      "failed" (get-in stats [:failed window])
+      "lastError" (most-recent-error (get errors id))
+      }
+     ))
  
- (defn authorized-ui-user? [user conf topology-conf]
-   (let [ui-users (concat (conf UI-USERS)
-                          (conf NIMBUS-ADMINS)
-                          (topology-conf UI-USERS)
-                          (topology-conf TOPOLOGY-USERS))]
-     (and (not (blank? user))
-          (some #(= % user) ui-users))))
+ (defn topology-summary [^TopologyInfo summ]
+   (let [executors (.get_executors summ)
+         workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))]
+       {"id" (.get_id summ)
++       "owner" (.get_owner summ)
+        "name" (.get_name summ)
+        "status" (.get_status summ)
+        "uptime" (pretty-uptime-sec (.get_uptime_secs summ))
+        "tasksTotal" (sum-tasks executors)
+        "workersTotal" (count workers)
 -       "executorsTotal" (count executors)}
++       "executorsTotal" (count executors)
++       "schedulerInfo" (.get_sched_status summ)}
+       ))
+ 
+ (defn spout-summary-json [topology-id id stats window]
+   (let [times (stats-times (:emitted stats))
+         display-map (into {} (for [t times] [t pretty-uptime-sec]))
+         display-map (assoc display-map ":all-time" (fn [_] "All time"))]
+      (for [k (concat times [":all-time"])
+            :let [disp ((display-map k) k)]]
+        {"windowPretty" disp
+         "window" k
+         "emitted" (get-in stats [:emitted k])
+         "transferred" (get-in stats [:transferred k])
+         "completeLatency" (float-str (get-in stats [:complete-latencies k]))
+         "acked" (get-in stats [:acked k])
+         "failed" (get-in stats [:failed k])
+         }
+        )))
  
 -(defn topology-page [id window include-sys?]
 +(defn topology-page [id window include-sys? user]
    (with-nimbus nimbus
      (let [window (if window window ":all-time")
            window-hint (window-hint window)
@@@ -713,134 -576,74 +607,76 @@@
            name (.get_name summ)
            status (.get_status summ)
            msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+           spouts (.get_spouts topology)
+           bolts (.get_bolts topology)
+           visualizer-data (visualization-data (merge (hashmap-to-persistent spouts)
+                                                      (hashmap-to-persistent bolts))
+                                               spout-comp-summs
+                                               bolt-comp-summs
+                                               window
+                                               id)
            ]
-       (if (or (blank? (*STORM-CONF* UI-FILTER))
-               (authorized-ui-user? user *STORM-CONF* topology-conf))
-         (concat
-           [[:h2 "Topology summary"]]
-           [(topology-summary-table summ)]
-           (topology-actions id name status msg-timeout)
-           [[:h2 "Topology stats"]]
-           (topology-stats-table id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
-           [[:h2 "Spouts (" window-hint ")"]]
-           (spout-comp-table id spout-comp-summs (.get_errors summ) window include-sys?)
-           [[:h2 "Bolts (" window-hint ")"]]
-           (bolt-comp-table id bolt-comp-summs (.get_errors summ) window include-sys?)
-           [[:h2 "Topology Configuration"]]
-           (configuration-table topology-conf)
-           [(mk-system-toggle-button include-sys?)]
-         )
- 
-         (unauthorized-user-html user)))))
++      (assert-authorized-ui-user user *STORM-CONF* topology-conf)
+       (merge
+        (topology-summary summ)
+        {"window" window
+         "windowHint" window-hint
+         "msgTimeout" msg-timeout
+         "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs include-sys?))
+         "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
+         "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
+         "configuration" topology-conf
+         "visualizationTable" (stream-boxes visualizer-data)
++        "uiActionsEnabled" (ui-actions-enabled?)
+         })
+       )))
  
- (defn component-task-summs [^TopologyInfo summ topology id]
-   (let [spout-summs (filter (partial spout-summary? topology) (.get_executors summ))
-         bolt-summs (filter (partial bolt-summary? topology) (.get_executors summ))
-         spout-comp-summs (group-by-comp spout-summs)
-         bolt-comp-summs (group-by-comp bolt-summs)
-         ret (if (contains? spout-comp-summs id)
-               (spout-comp-summs id)
-               (bolt-comp-summs id))]
-     (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)
-     ))
  
- (defn spout-summary-table [topology-id id stats window]
-   (let [times (stats-times (:emitted stats))
-         display-map (into {} (for [t times] [t pretty-uptime-sec]))
-         display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-     (sorted-table
-      [{:text "Window" :attr {:class "tip right"
-                              :title (:window tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Complete latency (ms)" :attr {:class "tip above"
-                                             :title (:complete-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:spout-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                             :title (:spout-failed tips)}}]
-      (for [k (concat times [":all-time"])
-            :let [disp ((display-map k) k)]]
-        [(link-to (if (= k window) {:class "red"} {})
-                  (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
-                  (escape-html disp))
-         (get-in stats [:emitted k])
-         (get-in stats [:transferred k])
-         (float-str (get-in stats [:complete-latencies k]))
-         (get-in stats [:acked k])
-         (get-in stats [:failed k])
-         ])
-      :time-cols [0])))
- 
- (defn spout-output-summary-table [stream-summary window]
+ (defn spout-output-stats [stream-summary window]
    (let [stream-summary (map-val swap-map-order (swap-map-order stream-summary))]
-     (sorted-table
-      [{:text "Stream" :attr {:class "tip right"
-                              :title (:stream tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Complete latency (ms)" :attr {:class "tip above"
-                                             :title (:complete-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:spout-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                             :title (:spout-failed tips)}}]
-      (for [[s stats] (stream-summary window)]
-        [s
-         (nil-to-zero (:emitted stats))
-         (nil-to-zero (:transferred stats))
-         (float-str (:complete-latencies stats))
-         (nil-to-zero (:acked stats))
-         (nil-to-zero (:failed stats))])
-      )))
+     (for [[s stats] (stream-summary window)]
+       {"stream" s
+        "emitted" (nil-to-zero (:emitted stats))
+        "transferred" (nil-to-zero (:transferred stats))
+        "completeLatency" (float-str (:complete-latencies stats))
+        "acked" (nil-to-zero (:acked stats))
+        "failed" (nil-to-zero (:failed stats))
+        }
+       )))
  
- (defn spout-executor-table [topology-id executors window include-sys?]
-   (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (:exec-id tips)}}
-     {:text "Uptime" :attr {:class "tip right"
-                            :title (:exec-uptime tips)}}
-     {:text "Host" :attr {:class "tip above"
-                          :title (:sup-host tips)}}
-     {:text "Port" :attr {:class "tip above"
-                          :title (:port tips)}}
-     {:text "Emitted" :attr {:class "tip above"
-                             :title (:emitted tips)}}
-     {:text "Transferred" :attr {:class "tip above"
-                                 :title (:transferred tips)}}
-     {:text "Complete latency (ms)" :attr {:class "tip above"
-                                           :title (:complete-lat tips)}}
-     {:text "Acked" :attr {:class "tip above"
-                           :title (:spout-acked tips)}}
-     {:text "Failed" :attr {:class "tip left"
-                           :title (:spout-failed tips)}}]
-    (for [^ExecutorSummary e executors
-          :let [stats (.get_stats e)
-                stats (if stats
-                        (-> stats
-                            (aggregate-spout-stats include-sys?)
-                            aggregate-spout-streams
-                            swap-map-order
-                            (get window)))]]
-      [(pretty-executor-info (.get_executor_info e))
-       (pretty-uptime-sec (.get_uptime_secs e))
-       (.get_host e)
-       (worker-log-link (.get_host e) (.get_port e) topology-id)
-       (nil-to-zero (:emitted stats))
-       (nil-to-zero (:transferred stats))
-       (float-str (:complete-latencies stats))
-       (nil-to-zero (:acked stats))
-       (nil-to-zero (:failed stats))
-       ]
-      )
-    :time-cols [1]
-    ))
- 
- (defn spout-page [window ^TopologyInfo topology-info component executors include-sys?]
+ (defn spout-executor-stats [topology-id executors window include-sys?]
+   (for [^ExecutorSummary e executors
+         :let [stats (.get_stats e)
+               stats (if stats
+                       (-> stats
+                           (aggregate-spout-stats include-sys?)
+                           aggregate-spout-streams
+                           swap-map-order
+                           (get window)))]]
+     {"id" (pretty-executor-info (.get_executor_info e))
+      "uptime" (pretty-uptime-sec (.get_uptime_secs e))
+      "host" (.get_host e)
+      "port" (.get_port e)
+      "emitted" (nil-to-zero (:emitted stats))
+      "transferred" (nil-to-zero (:transferred stats))
+      "completeLatency" (float-str (:complete-latencies stats))
+      "acked" (nil-to-zero (:acked stats))
+      "failed" (nil-to-zero (:failed stats))
 -     "workerLogLink" (worker-log-link (.get_host e) (.get_port e))
++     "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)
+      }
+   ))
+ 
+ (defn component-errors [errors-list]
+   (let [errors (->> errors-list
+                     (sort-by #(.get_error_time_secs ^ErrorInfo %))
+                     reverse)]
+     {"componentErrors"
+      (for [^ErrorInfo e errors]
+        {"time" (date-str (.get_error_time_secs e))
+          "error" (.get_error e)
+          })}))
+ 
+ (defn spout-stats [window ^TopologyInfo topology-info component executors include-sys?]
    (let [window-hint (str " (" (window-hint window) ")")
          stats (get-filled-stats executors)
          stream-summary (-> stats (aggregate-spout-stats include-sys?))
@@@ -881,265 -687,148 +720,164 @@@
                             (get window)
                             (select-keys [:acked :failed :process-latencies :executed :execute-latencies])
                             swap-map-order)]
-     (sorted-table
-      [{:text "Component" :attr {:class "tip right"
-                          :title (:comp-id tips)}}
-       {:text "Stream" :attr {:class "tip right"
-                              :title (:stream tips)}}
-       {:text "Execute latency (ms)" :attr {:class "tip above"
-                                            :title (:exec-lat tips)}}
-       {:text "Executed" :attr {:class "tip above"
-                                :title (:num-executed tips)}}
-       {:text "Process latency (ms)":attr {:class "tip above"
-                                           :title (:proc-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:bolt-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:bolt-failed tips)}}]
-      (for [[^GlobalStreamId s stats] stream-summary]
-        [(escape-html (.get_componentId s))
-         (.get_streamId s)
-         (float-str (:execute-latencies stats))
-         (nil-to-zero (:executed stats))
-         (float-str (:process-latencies stats))
-         (nil-to-zero (:acked stats))
-         (nil-to-zero (:failed stats))
-         ])
-      )))
- 
- (defn bolt-executor-table [topology-id executors window include-sys?]
-   (sorted-table
-    [{:text "Id" :attr {:class "tip right"
-                        :title (:exec-id tips)}}
-     {:text "Uptime" :attr {:class "tip right"
-                            :title (:exec-uptime tips)}}
-     {:text "Host" :attr {:class "tip above"
-                          :title (:sup-host tips)}}
-     {:text "Port" :attr {:class "tip above"
-                          :title (:port tips)}}
-     {:text "Emitted" :attr {:class "tip above"
-                             :title (:emitted tips)}}
-     {:text "Transferred" :attr {:class "tip above"
-                                 :title (:transferred tips)}}
-     {:text "Capacity (last 10m)" :attr {:class "tip above"
-                                         :title (:capacity tips)}}
-     {:text "Execute latency (ms)" :attr {:class "tip above"
-                                          :title (:exec-lat tips)}}
-     {:text "Executed" :attr {:class "tip above"
-                              :title (:num-executed tips)}}
-     {:text "Process latency (ms)":attr {:class "tip above"
-                                         :title (:proc-lat tips)}}
-     {:text "Acked" :attr {:class "tip above"
-                           :title (:bolt-acked tips)}}
-     {:text "Failed" :attr {:class "tip left"
-                            :title (:bolt-failed tips)}}]
-    (for [^ExecutorSummary e executors
-          :let [stats (.get_stats e)
-                stats (if stats
-                        (-> stats
-                            (aggregate-bolt-stats include-sys?)
-                            (aggregate-bolt-streams)
-                            swap-map-order
-                            (get window)))]]
-      [(pretty-executor-info (.get_executor_info e))
-       (pretty-uptime-sec (.get_uptime_secs e))
-       (.get_host e)
-       (worker-log-link (.get_host e) (.get_port e) topology-id)
-       (nil-to-zero (:emitted stats))
-       (nil-to-zero (:transferred stats))
-       (render-capacity (compute-executor-capacity e))
-       (float-str (:execute-latencies stats))
-       (nil-to-zero (:executed stats))
-       (float-str (:process-latencies stats))
-       (nil-to-zero (:acked stats))
-       (nil-to-zero (:failed stats))
-       ]
-      )
-    :time-cols [1]
-    ))
- 
- (defn bolt-summary-table [topology-id id stats window]
-   (let [times (stats-times (:emitted stats))
-         display-map (into {} (for [t times] [t pretty-uptime-sec]))
-         display-map (assoc display-map ":all-time" (fn [_] "All time"))]
-     (sorted-table
-      [{:text "Window" :attr {:class "tip right"
-                              :title (:window tips)}}
-       {:text "Emitted" :attr {:class "tip above"
-                               :title (:emitted tips)}}
-       {:text "Transferred" :attr {:class "tip above"
-                                   :title (:transferred tips)}}
-       {:text "Execute latency (ms)" :attr {:class "tip above"
-                                            :title (:exec-lat tips)}}
-       {:text "Executed" :attr {:class "tip above"
-                                :title (:num-executed tips)}}
-       {:text "Process latency (ms)":attr {:class "tip above"
-                                           :title (:proc-lat tips)}}
-       {:text "Acked" :attr {:class "tip above"
-                             :title (:bolt-acked tips)}}
-       {:text "Failed" :attr {:class "tip left"
-                              :title (:bolt-failed tips)}}]
-      (for [k (concat times [":all-time"])
-            :let [disp ((display-map k) k)]]
-        [(link-to (if (= k window) {:class "red"} {})
-                  (url-format "/topology/%s/component/%s?window=%s" topology-id id k)
-                  (escape-html disp))
-         (get-in stats [:emitted k])
-         (get-in stats [:transferred k])
-         (float-str (get-in stats [:execute-latencies k]))
-         (get-in stats [:executed k])
-         (float-str (get-in stats [:process-latencies k]))
-         (get-in stats [:acked k])
-         (get-in stats [:failed k])
-         ])
-      :time-cols [0])))
- 
- (defn bolt-page [window ^TopologyInfo topology-info component executors include-sys?]
+     (for [[^GlobalStreamId s stats] stream-summary]
+       {"component" (.get_componentId s)
+         "stream" (.get_streamId s)
+         "executeLatency" (float-str (:execute-latencies stats))
+         "processLatency" (float-str (:execute-latencies stats))
+         "executed" (nil-to-zero (:executed stats))
+         "acked" (nil-to-zero (:acked stats))
+         "failed" (nil-to-zero (:failed stats))
+         })))
+ 
+ (defn bolt-executor-stats [topology-id executors window include-sys?]
+   (for [^ExecutorSummary e executors
+         :let [stats (.get_stats e)
+               stats (if stats
+                       (-> stats
+                           (aggregate-bolt-stats include-sys?)
+                           (aggregate-bolt-streams)
+                           swap-map-order
+                           (get window)))]]
+     {"id" (pretty-executor-info (.get_executor_info e))
+       "uptime" (pretty-uptime-sec (.get_uptime_secs e))
+       "host" (.get_host e)
+       "port" (.get_port e)
+       "emitted" (nil-to-zero (:emitted stats))
+       "transferred" (nil-to-zero (:transferred stats))
+       "capacity" (float-str (nil-to-zero (compute-executor-capacity e)))
+       "executeLatency" (float-str (:execute-latencies stats))
+       "executed" (nil-to-zero (:executed stats))
+       "processLatency" (float-str (:process-latencies stats))
+       "acked" (nil-to-zero (:acked stats))
+       "failed" (nil-to-zero (:failed stats))
 -      "workerLogLink" (worker-log-link (.get_host e) (.get_port e))
++      "workerLogLink" (worker-log-link (.get_host e) (.get_port e) topology-id)
+       }))
+ 
+ (defn bolt-stats [window ^TopologyInfo topology-info component executors include-sys?]
    (let [window-hint (str " (" (window-hint window) ")")
          stats (get-filled-stats executors)
          stream-summary (-> stats (aggregate-bolt-stats include-sys?))
          summary (-> stream-summary aggregate-bolt-streams)]
-     (concat
-      [[:h2 "Bolt stats"]]
-      (bolt-summary-table (.get_id topology-info) component summary window)
- 
-      [[:h2 "Input stats" window-hint]]
-      (bolt-input-summary-table stream-summary window)
- 
-      [[:h2 "Output stats" window-hint]]
-      (bolt-output-summary-table stream-summary window)
- 
-      [[:h2 "Executors"]]
-      (bolt-executor-table (.get_id topology-info) executors window include-sys?)
-      )))
- 
- (defn errors-table [errors-list]
-   (let [errors (->> errors-list
-                     (sort-by #(.get_error_time_secs ^ErrorInfo %))
-                     reverse)]
-     (sorted-table
-      ["Time" "Error"]
-      (for [^ErrorInfo e errors]
-        [(date-str (.get_error_time_secs e))
-         [:pre (.get_error e)]])
-      :sort-list "[[0,1]]"
-      )))
+     {"boltStats" (bolt-summary (.get_id topology-info) component summary window)
+      "inputStats" (bolt-input-stats stream-summary window)
+      "outputStats" (bolt-output-stats stream-summary window)
+      "executorStats" (bolt-executor-stats (.get_id topology-info) executors window include-sys?)}
+     ))
  
 -(defn component-page [topology-id component window include-sys?]
 +(defn component-page [topology-id component window include-sys? user]
    (with-nimbus nimbus
      (let [window (if window window ":all-time")
            summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
            topology (.getTopology ^Nimbus$Client nimbus topology-id)
++          topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))
            type (component-type topology component)
            summs (component-task-summs summ topology component)
-           spec (cond (= type :spout) (spout-page window summ component summs include-sys?)
-                      (= type :bolt) (bolt-page window summ component summs include-sys?))
-           topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))]
-       (if (or (blank? (*STORM-CONF* UI-FILTER))
-               (authorized-ui-user? user *STORM-CONF* topology-conf))
-         (concat
-           [[:h2 "Component summary"]
-            (table [{:text "Id" :attr {:class "tip right"
-                                       :title (:comp-id tips)}}
-                    {:text "Topology" :attr {:class "tip above"
-                                       :title (str (:name tips) " " (:name-link tips))}}
-                    {:text "Executors" :attr {:class "tip above"
-                                       :title (:num-execs tips)}}
-                    {:text "Tasks" :attr {:class "tip above"
-                                   :title (:num-tasks tips)}}]
-                   [[(escape-html component)
-                     (topology-link (.get_id summ) (.get_name summ))
-                     (count summs)
-                     (sum-tasks summs)
-                     ]])]
-           spec
-           [[:h2 "Errors"]
-            (errors-table (get (.get_errors summ) component))
-            (mk-system-toggle-button include-sys?)])
- 
-         (unauthorized-user-html user)))))
- 
- (defn get-include-sys? [cookies]
-   (let [sys? (get cookies "sys")
-         sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)]
-     sys?))
+           spec (cond (= type :spout) (spout-stats window summ component summs include-sys?)
+                      (= type :bolt) (bolt-stats window summ component summs include-sys?))
+           errors (component-errors (get (.get_errors summ) component))]
++      (assert-authorized-ui-user user *STORM-CONF* topology-conf)
+       (merge
+        {"id" component
+          "name" (.get_name summ)
+          "executors" (count summs)
+          "tasks" (sum-tasks summs)
+          "topologyId" topology-id
+          "window" window
+          "componentType" (name type)
+          "windowHint" (window-hint window)
+          } spec errors))))
+ 
+ (defn check-include-sys? [sys?]
+   (if (or (nil? sys?) (= "false" sys?)) false true))
+ 
+ (defn json-response [data & [status]]
+   {:status (or status 200)
+    :headers {"Content-Type" "application/json"}
+    :body (to-json data)
+    })
  
 +(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
 +
- (if (ui-actions-enabled?)
-   (defroutes main-routes
-     (GET "/" [:as {servlet-request :servlet-request}]
-          (ui-template (main-page)
-                       (.getUserName http-creds-handler servlet-request)))
-     (GET "/topology/:id" [:as {:keys [cookies servlet-request]} id & m]
-          (let [include-sys? (get-include-sys? cookies)
-                user (.getUserName http-creds-handler servlet-request)]
-            (ui-template (topology-page (URLDecoder/decode id) (:window m) include-sys? user)
-                         user)))
-     (GET "/topology/:id/component/:component" [:as {:keys [cookies servlet-request]}
-                                                id component & m]
-          (let [include-sys? (get-include-sys? cookies)
-                user (.getUserName http-creds-handler servlet-request)]
-            (ui-template (component-page (URLDecoder/decode id) component (:window m) include-sys? user)
-                         user)))
-     (POST "/topology/:id/activate" [id]
-       (with-nimbus nimbus
-         (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus (URLDecoder/decode id))
-               name (.get_name tplg)]
-           (.activate nimbus name)
-           (log-message "Activating topology '" name "'")))
-       (resp/redirect (str "/topology/" id)))
-     (POST "/topology/:id/deactivate" [id]
-       (with-nimbus nimbus
-         (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus (URLDecoder/decode id))
-               name (.get_name tplg)]
-           (.deactivate nimbus name)
-           (log-message "Deactivating topology '" name "'")))
-       (resp/redirect (str "/topology/" id)))
-     (POST "/topology/:id/rebalance/:wait-time" [id wait-time]
-       (with-nimbus nimbus
-         (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus (URLDecoder/decode id))
-               name (.get_name tplg)
-               options (RebalanceOptions.)]
-           (.set_wait_secs options (Integer/parseInt wait-time))
-           (.rebalance nimbus name options)
-           (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
-       (resp/redirect (str "/topology/" id)))
-     (POST "/topology/:id/kill/:wait-time" [id wait-time]
-       (with-nimbus nimbus
-         (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus (URLDecoder/decode id))
-               name (.get_name tplg)
-               options (KillOptions.)]
-           (.set_wait_secs options (Integer/parseInt wait-time))
-           (.killTopologyWithOpts nimbus name options)
-           (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
-       (resp/redirect (str "/topology/" id)))
-     (route/resources "/")
-     (route/not-found "Page not found"))
- 
-   (defroutes main-routes
-     (GET "/" [:as {servlet-request :servlet-request}]
-          (ui-template (main-page)
-                       (.getUserName http-creds-handler servlet-request)))
-     (GET "/topology/:id" [:as {:keys [cookies servlet-request]} id & m]
-          (let [include-sys? (get-include-sys? cookies)
-                user (.getUserName http-creds-handler servlet-request)]
-            (ui-template (topology-page (URLDecoder/decode id) (:window m) include-sys? user) user)))
-     (GET "/topology/:id/component/:component" [:as {:keys [cookies servlet-request]}
-                                                id component & m]
-          (let [include-sys? (get-include-sys? cookies)
-                user (.getUserName http-creds-handler servlet-request)]
-            (ui-template (component-page (URLDecoder/decode id) component (:window m) include-sys? user)
-                         user)))
-     (route/resources "/")
-     (route/not-found "Page not found")))
- 
- (defn exception->html [ex]
-   (concat
-     [[:h2 "Internal Server Error"]]
-     [[:pre (let [sw (java.io.StringWriter.)]
+ (defroutes main-routes
+   (GET "/api/v1/cluster/configuration" []
+        (cluster-configuration))
+   (GET "/api/v1/cluster/summary" []
+        (json-response (cluster-summary)))
+   (GET "/api/v1/supervisor/summary" []
+        (json-response (supervisor-summary)))
+   (GET "/api/v1/topology/summary" []
+        (json-response (all-topologies-summary)))
 -  (GET  "/api/v1/topology/:id" [id & m]
 -        (let [id (url-decode id)]
 -          (json-response (topology-page id (:window m) (check-include-sys? (:sys m))))))
++  (GET  "/api/v1/topology/:id" [:as {:keys [cookies servlet-request]} id & m]
++        (let [id (url-decode id)
++              user (.getUserName http-creds-handler servlet-request)]
++          (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user))))
+   (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
 -       (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m)))))
 -  (GET "/api/v1/topology/:id/component/:component" [id component & m]
++        (let [id (url-decode id)
++              user (.getUserName http-creds-handler servlet-request)]
++          (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m)) user))))
++  (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request]} id component & m]
+        (let [id (url-decode id)
 -             component (url-decode component)]
 -         (json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
 -  (POST "/api/v1/topology/:id/activate" [id]
++             component (url-decode component)
++             user (.getUserName http-creds-handler servlet-request)]
++         (json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user))))
++  (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id]
+     (with-nimbus nimbus
+       (let [id (url-decode id)
+             tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
 -            name (.get_name tplg)]
++            name (.get_name tplg)
++            user (.getUserName http-creds-handler servlet-request)]
++        (assert-authorized-topology-user user)
+         (.activate nimbus name)
+         (log-message "Activating topology '" name "'")))
+     (resp/redirect (str "/api/v1/topology/" id)))
+ 
 -  (POST "/api/v1/topology/:id/deactivate" [id]
++  (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id]
+     (with-nimbus nimbus
+       (let [id (url-decode id)
+             tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
 -            name (.get_name tplg)]
++            name (.get_name tplg)
++            user (.getUserName http-creds-handler servlet-request)]
++        (assert-authorized-topology-user user)
+         (.deactivate nimbus name)
+         (log-message "Deactivating topology '" name "'")))
+     (resp/redirect (str "/api/v1/topology/" id)))
 -  (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
++  (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
+     (with-nimbus nimbus
+       (let [id (url-decode id)
+             tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+             name (.get_name tplg)
 -            options (RebalanceOptions.)]
++            options (RebalanceOptions.)
++            user (.getUserName http-creds-handler servlet-request)]
++        (assert-authorized-topology-user user)
+         (.set_wait_secs options (Integer/parseInt wait-time))
+         (.rebalance nimbus name options)
+         (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+     (resp/redirect (str "/api/v1/topology/" id)))
 -  (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
++  (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time]
+     (with-nimbus nimbus
+       (let [id (url-decode id)
+             tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+             name (.get_name tplg)
 -            options (KillOptions.)]
++            options (KillOptions.)
++            user (.getUserName http-creds-handler servlet-request)]
++        (assert-authorized-topology-user user)
+         (.set_wait_secs options (Integer/parseInt wait-time))
+         (.killTopologyWithOpts nimbus name options)
+         (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+     (resp/redirect (str "/api/v1/topology/" id)))
+ 
+   (GET "/" [:as {cookies :cookies}]
+        (resp/redirect "/index.html"))
+   (route/resources "/")
+   (route/not-found "Page not found"))
+ 
+ (defn exception->json [ex]
+   { "error" "Internal Server Error"
+      "errorMessage" (let [sw (java.io.StringWriter.)]
        (.printStackTrace ex (java.io.PrintWriter. sw))
-       (.toString sw))]]))
+       (.toString sw))
+     })
  
  (defn catch-errors [handler]
    (fn [request]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/util.clj
index 45b5682,82e16dc..b5f0c0c
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@@ -928,31 -889,5 +928,34 @@@
                (list form x)))
    ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
  
 +(def LOG-DIR
 +  (.getCanonicalPath 
 +                (clojure.java.io/file (System/getProperty "storm.home") "logs")))
 +
 +(defn- logs-rootname [storm-id port]
 +  (str storm-id "-worker-" port))
 +
 +(defn logs-filename [storm-id port]
 +  (str (logs-rootname storm-id port) ".log"))
 +
 +(defn logs-metadata-filename [storm-id port]
 +  (str (logs-rootname storm-id port) ".yaml"))
 +
 +(def worker-log-filename-pattern #"((.*-\d+-\d+)-worker-(\d+)).log")
 +
 +(defn get-log-metadata-file
 +  ([fname]
 +    (if-let [[_ _ id port] (re-matches worker-log-filename-pattern fname)]
 +      (get-log-metadata-file id port)))
 +  ([id port]
 +    (clojure.java.io/file LOG-DIR "metadata" (logs-metadata-filename id port))))
 +
 +(defn clojure-from-yaml-file [yamlFile]
 +  (try
 +    (let [obj (.load (Yaml. (SafeConstructor.)) (java.io.FileReader. yamlFile))]
 +      (clojurify-structure obj))
 +    (catch Exception ex
 +      (log-error ex))))
++
+ (defn hashmap-to-persistent [^HashMap m]
+   (zipmap (.keySet m) (.values m)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 07da267,ff309a5..3b3f7e5
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -741,14 -475,12 +754,20 @@@ public class Config extends HashMap<Str
      public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
  
      /**
 +     * The jvm opts provided to workers launched by this supervisor for GC. All "%ID%" substrings are replaced
 +     * with an identifier for this worker.  Because the JVM complains about multiple GC opts the topology
 +     * can override this default value by setting topology.worker.gc.childopts.
 +     */
 +    public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
 +    public static final Object WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 +
 +    /**
+      * control how many worker receiver threads we need per worker
+      */
+     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
+     public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+     
+     /**
       * How often this worker should heartbeat to the supervisor.
       */
      public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/Utils.java
index c28d93a,6a0a447..87f1654
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@@ -310,17 -303,39 +312,41 @@@ public class Utils 
      }
      
      public static Integer getInt(Object o) {
-         if(o instanceof Long) {
-             return ((Long) o ).intValue();
-         } else if (o instanceof Integer) {
-             return (Integer) o;
-         } else if (o instanceof Short) {
-             return ((Short) o).intValue();
-         } else if (o instanceof String) {
-             return Integer.parseInt((String) o);
-         } else {
-             throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
-         }
+       Integer result = getInt(o, null);
+       if (null == result) {
+         throw new IllegalArgumentException("Don't know how to convert null + to int");
+       }
+       return result;
+     }
+     
+     public static Integer getInt(Object o, Integer defaultValue) {
+       if (null == o) {
+         return defaultValue;
+       }
+       
+       if(o instanceof Long) {
+           return ((Long) o ).intValue();
+       } else if (o instanceof Integer) {
+           return (Integer) o;
+       } else if (o instanceof Short) {
+           return ((Short) o).intValue();
++      } else if (o instanceof String) {
++          return Integer.parseInt((String) o);
+       } else {
+           throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
+       }
+     }
+ 
+     public static boolean getBoolean(Object o, boolean defaultValue) {
+       if (null == o) {
+         return defaultValue;
+       }
+       
+       if(o instanceof Boolean) {
+           return (Boolean) o;
+       } else {
+           throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+       }
      }
      
      public static long secureRandomLong() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/index.html
index 0000000,77af159..fe83b2c
mode 000000,100644..100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@@ -1,0 -1,73 +1,73 @@@
+ <html><head>
+ <title>Storm UI</title>
+ <link href="/css/bootstrap-1.4.0.css" rel="stylesheet" type="text/css">
+ <link href="/css/style.css" rel="stylesheet" type="text/css">
+ <script src="/js/jquery-1.6.2.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.tablesorter.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
+ <script src="/js/jquery.mustache.js" type="text/javascript"></script>
+ <script src="/js/bootstrap-twipsy.js" type="text/javascript"></script>
+ <script src="/js/script.js" type="text/javascript"></script>
+ </head>
+ <body>
+ <h1><a href="/">Storm UI</a></h1>
+ <h2>Cluster Summary</h2>
+ <div id="cluster-summary">
+ </div>
+ <h2>Topology summary</h2>
+ <div id="topology-summary">
+ </div>
+ <h2>Supervisor summary</h2>
+ <div id="supervisor-summary">
+ </div>
+ <h2>Nimbus Configuration</h2>
+ <div id="nimbus-configuration"></div>
+ <div id="json-response-error"></div>
+ </body>
+ <script>
+ $(document).ready(function() {
+     $.ajaxSetup({
+         "error":function(jqXHR,textStatus,response) {
+             var errorJson = jQuery.parseJSON(jqXHR.responseText);
+             $.get("/templates/json-error-template.html", function(template) {
+                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
+             });
+         }
+     });
+     var template = $.get("/templates/index-page-template.html");
+     var clusterSummary = $("#cluster-summary");
+     var topologySummary = $("#topology-summary");
+     var supervisorSummary = $("#supervisor-summary");
+     var config = $("#nimbus-configuration");
+ 
+     $.getJSON("/api/v1/cluster/summary",function(response,status,jqXHR) {
+         $.get("/templates/index-page-template.html", function(template) {
+             clusterSummary.append(Mustache.render($(template).filter("#cluster-summary-template").html(),response));
+         });
+     });
+     $.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) {
+       $.get("/templates/index-page-template.html", function(template) {
+           topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
+           if(response["topologies"].length > 0) {
 -              $("#topology-summary-table").tablesorter({ sortList: [[0,0]], headers: {3: { sorter: "stormtimestr"}}});
++              $("#topology-summary-table").tablesorter({ sortList: [[0,0]], headers: {4: { sorter: "stormtimestr"}}});
+               }
+       });
+     });
+     $.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) {
+       $.get("/templates/index-page-template.html", function(template) {
+           supervisorSummary.append(Mustache.render($(template).filter("#supervisor-summary-template").html(),response));
+           if(response["supervisors"].length > 0) {
 -              $("#supervisor-summary-table").tablesorter({ sortList: [[0,0]], headers: {3: { sorter: "stormtimestr"}}});
++              $("#supervisor-summary-table").tablesorter({ sortList: [[0,0]], headers: {2: { sorter: "stormtimestr"}}});
+           }
+       });
+     });
+     $.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) {
+       var formattedResponse = formatConfigData(response);
+       $.get("/templates/index-page-template.html", function(template) {
+           config.append(Mustache.render($(template).filter("#configuration-template").html(),formattedResponse));
+           $("#nimbus-configuration-table").tablesorter({ sortList: [[0,0]], headers: {}});
+       });
+     });
+   });
+ </script>
+ </html>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/ui/public/js/script.js
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/js/script.js
index e9902ab,8f7608e..6a9a560
--- a/storm-core/src/ui/public/js/script.js
+++ b/storm-core/src/ui/public/js/script.js
@@@ -106,4 -106,49 +106,50 @@@ $(function () 
            delayIn: 1000
        });
      }
- })
+ });
+ 
+ function formatConfigData(data) {
+     var mustacheFormattedData = {'config':[]};
+     for (var prop in data) {
+        if(data.hasOwnProperty(prop)) {
+            mustacheFormattedData['config'].push({
+                'key': prop,
+                'value': data[prop]
+            });
+        }
+     }
+     return mustacheFormattedData;
+ }
+ 
+ 
+ function renderToggleSys(div) {
+     var sys = $.cookies.get("sys") || false;
+     if(sys) {
+        div.append("<span data-original-title=\"Use this to toggle inclusion of storm system components.\" class=\"tip right\"><input onclick=\"toggleSys()\" value=\"Hide System Stats\" type=\"button\"></span>");
+     } else {
+        div.append("<span class=\"tip right\" title=\"Use this to toggle inclusion of storm system components.\"><input onclick=\"toggleSys()\" value=\"Show System Stats\" type=\"button\"></span>");
+     }
+ }
+ 
 -function topologyActionJson(id,name,status,msgTimeout) {
++function topologyActionJson(id,name,status,msgTimeout, uiActionsEnabled) {
+     var jsonData = {};
++    jsonData["uiActionsEnabled"] = uiActionsEnabled;
+     jsonData["id"] = id;
+     jsonData["name"] = name;
+     jsonData["msgTimeout"] = msgTimeout;
+     jsonData["activateStatus"] = (status === "ACTIVE") ? "disabled" : "enabled";
+     jsonData["deactivateStatus"] = (status === "ACTIVE") ? "enabled" : "disabled";
+     jsonData["rebalanceStatus"] = (status === "ACTIVE" || status === "INACTIVE" ) ? "enabled" : "disabled";
+     jsonData["killStatus"] = (status !== "KILLED") ? "enabled" : "disabled";
+     return jsonData;
+ }
+ 
+ function topologyActionButton(id,name,status,actionLabel,command,wait,defaultWait) {
+     var buttonData = {};
+     buttonData["buttonStatus"] = status ;
+     buttonData["actionLabel"] = actionLabel;
+     buttonData["command"] = command;
+     buttonData["isWait"] = wait;
+     buttonData["defaultWait"] = defaultWait;
+     return buttonData;
+ }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0a98bee2/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------
diff --cc storm-core/src/ui/public/templates/index-page-template.html
index 0000000,128f2d0..20eddad
mode 000000,100644..100644
--- a/storm-core/src/ui/public/templates/index-page-template.html
+++ b/storm-core/src/ui/public/templates/index-page-template.html
@@@ -1,0 -1,62 +1,74 @@@
+ <script id="cluster-summary-template" type="text/html">
+ <table id="cluster-summary-table"><thead><tr><th><span class="tip right" title="The version of storm installed on the UI node. (Hopefully, this is the same on all storm nodes!)">Version</span></th><th><span class="tip right" title="The duration the current Nimbus instance has been running. (Note that the storm cluster may have been deployed and available for a much longer period than the current Nimbus process has been running.)">Nimbus uptime</span></th><th><span class="tip above" title="The number of nodes in the cluster currently.">Supervisors</span></th><th><span class="tip above" title="Slots are Workers (processes).">Used slots</span></th><th><span class="tip above" title="Slots are Workers (processes).">Free slots</span></th><th><span class="tip above" title="Slots are Workers (processes).">Total slots</span></th><th><span class="tip above" title="Executors are threads in a Worker process.">Executors</span></th><th><span class="tip left" title="A Task is an instance of a Bol
 t or Spout. The number of Tasks is almost always equal to the number of Executors.">Tasks</span></th></tr></thead>
+ <tbody>
+ <tr>
+   <td>{{stormVersion}}</td>
+   <td>{{nimbusUptime}}</td>
+   <td>{{supervisors}}</td>
+   <td>{{slotsUsed}}</td>
+   <td>{{slotsFree}}</td>
+   <td>{{slotsTotal}}</td>
+   <td>{{executorsTotal}}</td>
+   <td>{{tasksTotal}}</td>
+ </tr>
+ </tbody>
+ </table>
+ </script>
+ <script id="topology-summary-template" type="text/html">
+ <table class="zebra-striped" id="topology-summary-table">
 -<thead><tr><th><span class="tip right" title="The name given to the topology by when it was submitted. Click the name to view the Topology's information.">Name</span></th><th><span class="tip right" title="The unique ID given to a Topology each time it is launched.">Id</span></th><th><span class="tip above" title="The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING.">Status</span></th><th><span class="tip above" title="The time since the Topology was submitted.">Uptime</span></th><th><span class="tip above" title="The number of Workers (processes).">Num workers</span></th><th><span class="tip above" title="Executors are threads in a Worker process.">Num executors</span></th><th><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Num tasks</span></th></tr></thead>
++<thead><tr>
++    <th><span class="tip right" title="The name given to the topology by when it was submitted. Click the name to view the Topology's information.">Name</span></th>
++    <th><span class="tip right" title="The unique ID given to a Topology each time it is launched.">Id</span></th>
++    <th><span class="tip above" title="The user that submitted the Topology, if authentication is enabled.">Owner</span></th>
++    <th><span class="tip above" title="The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING.">Status</span></th>
++    <th><span class="tip above" title="The time since the Topology was submitted.">Uptime</span></th>
++    <th><span class="tip above" title="The number of Workers (processes).">Num workers</span></th>
++    <th><span class="tip above" title="Executors are threads in a Worker process.">Num executors</span></th>
++    <th><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Num tasks</span></th>
++    <th><span class="tip left" title="This shows information from the scheduler about the latest attempt to schedule the Topology on the cluster.">Scheduler Info</span></th>
++</tr></thead>
+ <tbody>
+ {{#topologies}}
+ <tr>
+   <td><a href="/topology.html?id={{id}}">{{name}}</a></td>
+   <td>{{id}}</td>
++  <td>{{owner}}</td>
+   <td>{{status}}</td>
+   <td>{{uptime}}</td>
+   <td>{{tasksTotal}}</td>
+   <td>{{workersTotal}}</td>
+   <td>{{executorsTotal}}</td>
++  <td>{{schedulerInfo}}</td>
+ </tr>
+ {{/topologies}}
+ </tbody>
+ </table>
+ </script>
+ <script id="supervisor-summary-template" type="text/html">
+ <table class="zebra-striped" id="supervisor-summary-table"><thead><tr><th><span class="tip right" title="A unique identifier given to a Supervisor when it joins the cluster.">Id</span></th><th><span class="tip above" title="The hostname reported by the remote host. (Note that this hostname is not the result of a reverse lookup at the Nimbus node.)">Host</span></th><th><span class="tip above" title="The length of time a Supervisor has been registered to the cluster.">Uptime</span></th><th><span class="tip above" title="Slots are Workers (processes).">Slots</span></th><th><span class="tip left" title="Slots are Workers (processes).">Used slots</span></th></tr></thead>
+ <tbody>
+ {{#supervisors}}
+ <tr>
+   <td>{{id}}</td>
+   <td>{{host}}</td>
+   <td>{{uptime}}</td>
+   <td>{{slotsTotal}}</td>
+   <td>{{slotsUsed}}</td>
+ </tr>
+ {{/supervisors}}
+ </tbody>
+ </table>
+ </script>
+ 
+ <script id="configuration-template" type="text/html">
+ <table class="zebra-striped" id="nimbus-configuration-table"><thead><tr><th>Key</th><th>Value</th></tr></thead>
+ <tbody>
+ {{#config}}
+ <tr>
+ <td>{{key}}</td>
+ <td>{{value}}</td>
+ </tr>
+ {{/config}}
+ </tbody>
+ </table>
+ </script>


Mime
View raw message