storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [03/10] storm git commit: Adding dynamic profiling for worker
Date Wed, 04 Nov 2015 17:18:46 GMT
Adding dynamic profiling for worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0c2021e6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0c2021e6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0c2021e6

Branch: refs/heads/master
Commit: 0c2021e6331cb5324a2803aff9783b539a05f63c
Parents: 8ba776b
Author: Kishor Patil <kpatil@yahoo-inc.com>
Authored: Mon Nov 2 18:51:58 2015 +0000
Committer: Kishor Patil <kpatil@yahoo-inc.com>
Committed: Mon Nov 2 18:51:58 2015 +0000

----------------------------------------------------------------------
 bin/flight.bash                                 |  139 ++
 conf/defaults.yaml                              |    1 +
 docs/DYNAMIC_WORKER_PROFILING.md                |   24 +
 docs/images/dynamic_profiling_debugging_1.png   |  Bin 0 -> 93635 bytes
 docs/images/dynamic_profiling_debugging_2.png   |  Bin 0 -> 138120 bytes
 docs/images/dynamic_profiling_debugging_3.png   |  Bin 0 -> 96974 bytes
 storm-core/src/clj/backtype/storm/cluster.clj   |   57 +-
 storm-core/src/clj/backtype/storm/config.clj    |    4 +
 storm-core/src/clj/backtype/storm/converter.clj |   19 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   45 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   35 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  156 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |    4 +-
 storm-core/src/clj/backtype/storm/stats.clj     |    9 +
 storm-core/src/clj/backtype/storm/ui/core.clj   |  149 +-
 storm-core/src/clj/backtype/storm/util.clj      |   10 +-
 storm-core/src/jvm/backtype/storm/Config.java   |    6 +
 .../jvm/backtype/storm/generated/Nimbus.java    | 1977 +++++++++++++++++-
 .../backtype/storm/generated/ProfileAction.java |   74 +
 .../storm/generated/ProfileRequest.java         |  631 ++++++
 .../auth/authorizer/SimpleACLAuthorizer.java    |    8 +
 .../src/native/worker-launcher/impl/main.c      |   10 +
 .../worker-launcher/impl/worker-launcher.c      |   47 +
 .../worker-launcher/impl/worker-launcher.h      |    2 +
 storm-core/src/py/storm/Nimbus-remote           |   14 +
 storm-core/src/py/storm/Nimbus.py               |  396 ++++
 storm-core/src/py/storm/ttypes.py               |  122 ++
 storm-core/src/storm.thrift                     |   20 +
 storm-core/src/ui/public/component.html         |  165 +-
 .../templates/component-page-template.html      |   53 +
 30 files changed, 4149 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/bin/flight.bash
----------------------------------------------------------------------
diff --git a/bin/flight.bash b/bin/flight.bash
new file mode 100755
index 0000000..05068b5
--- /dev/null
+++ b/bin/flight.bash
@@ -0,0 +1,139 @@
+#!/bin/bash
+
+JDKPATH="/home/y/share/yjava_jdk/java"
+BINPATH="/usr/bin"
+USER=`whoami`
+
+#SETTINGS=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/jfr/profile.jfc
+SETTINGS=profile
+
+platform='unknown'
+unamestr=`uname`
+if [[ "$unamestr" == 'Linux' ]]; then
+    platform='linux'
+elif [[ "$unamestr" == 'Darwin' ]]; then
+    platform='darwin'
+elif [[ "$unamestr" == 'FreeBSD' ]]; then
+    platform='freebsd'
+fi
+
+if [[ $platform == 'linux' ]]; then
+    BINPATH="$JDKPATH/bin"
+elif [[ $platform == 'darwin' ]]; then
+    BINPATH="/usr/bin"
+fi
+
+function start_record {
+    # start_record pid
+    already_recording=false
+    for rid in `get_recording_ids $1`; do
+        already_recording=true
+        break;
+    done
+    if [ "$already_recording" = false ]; then
+        $BINPATH/jcmd $1 JFR.start settings=${SETTINGS}
+    fi
+}
+
+function dump_record {
+    for rid in `get_recording_ids $1`; do
+        FILENAME=recording-$1-${rid}-${NOW}.jfr
+        $BINPATH/jcmd $1 JFR.dump recording=$rid filename="$2/${FILENAME}"
+    done
+}
+
+function jstack_record {
+    FILENAME=jstack-$1-${NOW}.txt
+    $BINPATH/jstack $1 > "$2/${FILENAME}"
+}
+
+function jmap_record {
+    FILENAME=recording-$1-${NOW}.bin
+    $BINPATH/jmap -dump:format=b,file="$2/${FILENAME}" $1
+}
+
+function stop_record {
+    for rid in `get_recording_ids $1`; do
+        FILENAME=recording-$1-${rid}-${NOW}.jfr
+        $BINPATH/jcmd $1 JFR.dump recording=$rid filename="$2/${FILENAME}"
+        $BINPATH/jcmd $1 JFR.stop recording=$rid
+    done
+}
+
+function get_recording_ids {
+    $BINPATH/jcmd $1 JFR.check | perl -n -e '/recording=([0-9]+)/ && print "$1 "'
+}
+
+function usage_and_quit {
+    echo "Usage: $0 pid start [profile_settings]"
+    echo "       $0 pid dump target_dir"
+    echo "       $0 pid stop target_dir"
+    echo "       $0 pid jstack target_dir"
+    echo "       $0 pid jmap target_dir"
+    echo "       $0 pid kill"
+    exit -1
+}
+
+# Before using this script: make sure FlightRecorder is enabled
+
+if [ "$#" -le 1 ]; then
+    echo "Wrong number of arguments.."
+    usage_and_quit
+
+fi
+# call this script with the process pid, example: "./flight PID start" or "./flight PID stop"
+PID="$1"
+CMD="$2"
+
+if /bin/ps -p $PID > /dev/null
+then
+    if [[ $platform == 'linux' ]]; then
+        USER=`/bin/ps -ouser --noheader $PID`
+    elif [[ $platform == 'darwin' ]]; then
+        USER=`/bin/ps -ouser $PID`
+    fi
+else
+    echo "No such pid running: $PID"
+    usage_and_quit
+fi
+
+if [ "$CMD" != "start" ] && [ "$CMD" != "kill" ]; then
+    if [[ $3 ]] && [[ -d $3 ]]
+    then
+        TARGETDIR="$3"
+        mkdir -p ${TARGETDIR}
+    else
+        echo "Missing target directory"
+        usage_and_quit
+    fi
+fi
+
+NOW=`date +'%Y%m%d-%H%M%S'`
+if [ "$CMD" = "" ]; then
+    usage_and_quit
+elif [ "$CMD" = "kill" ]; then
+    echo "Killing process with pid: $PID"
+    kill -9 ${PID}
+elif [ "$CMD" = "start" ]; then
+    if [[ $3 ]]
+    then
+        SETTINGS=$3
+    fi
+    start_record ${PID}
+elif [ "$CMD" = "stop" ]; then
+    echo "Capturing dump before stopping in dir $TARGETDIR"
+    stop_record ${PID} ${TARGETDIR}
+elif [ "$CMD" = "jstack" ]; then
+    echo "Capturing dump in dir $TARGETDIR"
+    jstack_record ${PID} ${TARGETDIR}
+elif [ "$CMD" = "jmap" ]; then
+    echo "Capturing dump in dir $TARGETDIR"
+    jmap_record ${PID} ${TARGETDIR}
+elif [ "$CMD" = "dump" ]; then
+    echo "Capturing dump in dir $TARGETDIR"
+    dump_record ${PID} ${TARGETDIR}
+else
+    usage_and_quit
+fi
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 160c29f..84babc3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -144,6 +144,7 @@ supervisor.cpu.capacity: 400.0
 worker.heap.memory.mb: 768
 worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump"
 worker.gc.childopts: ""
+worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
 worker.heartbeat.frequency.secs: 1
 
 # check whether dynamic log levels can be reset from DEBUG to INFO in workers

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/docs/DYNAMIC_WORKER_PROFILING.md
----------------------------------------------------------------------
diff --git a/docs/DYNAMIC_WORKER_PROFILING.md b/docs/DYNAMIC_WORKER_PROFILING.md
new file mode 100644
index 0000000..727322f
--- /dev/null
+++ b/docs/DYNAMIC_WORKER_PROFILING.md
@@ -0,0 +1,24 @@
+Dynamic Worker Profiling
+==========================
+
+In multi-tenant mode, storm launches long-running JVMs across cluster without sudo access to user. Self-serving of Java heap-dumps, jstacks and java profiling of these JVMs would improve users' ability to analyze and debug issues when monitoring it actively.
+
+The storm dynamic profiler lets you dynamically take heap-dumps, jprofile or jstack for a worker jvm running on stock cluster. It let user download these dumps from the browser and use your favorite tools to analyze it  The UI component page provides list workers for the component and action buttons. The logviewer lets you download the dumps generated by these logs. Please see the screenshots for more information.
+
+Using the Storm UI
+-------------
+
+In order to request for heap-dump, jstack, start/stop/dump jprofile or restart a worker, click on a running topology, then click on specific component, then you can select worker from the dropdown for that particular component and then click on “Start","Heap", "Jstack" or "Restart Worker" in the "Profiing and Debugging" section.
+
+![Profiling and Debugging](images/dynamic_profiling_debugging_1.png "Profiling and Debugging")
+
+For start jprofile, provide a timeout in minutes (or 10 if not needed). Then click on “Start”.
+
+![After starting jprofile for worker](images/dynamic_profiling_debugging_2.png "After jprofile for worker ")
+
+To stop the jprofile logging click on the “Stop” button. This dumps the jprofile stats and stops the profiling. Refresh the page for the line to disappear from the UI.
+
+Click on "My Dump Files" to go the logviewer UI for list of worker specific dump files.
+
+![Dump Files Links for worker](images/dynamic_profiling_debugging_3.png "Dump Files Links for worker")
+

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/docs/images/dynamic_profiling_debugging_1.png
----------------------------------------------------------------------
diff --git a/docs/images/dynamic_profiling_debugging_1.png b/docs/images/dynamic_profiling_debugging_1.png
new file mode 100644
index 0000000..3913e86
Binary files /dev/null and b/docs/images/dynamic_profiling_debugging_1.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/docs/images/dynamic_profiling_debugging_2.png
----------------------------------------------------------------------
diff --git a/docs/images/dynamic_profiling_debugging_2.png b/docs/images/dynamic_profiling_debugging_2.png
new file mode 100644
index 0000000..66c0236
Binary files /dev/null and b/docs/images/dynamic_profiling_debugging_2.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/docs/images/dynamic_profiling_debugging_3.png
----------------------------------------------------------------------
diff --git a/docs/images/dynamic_profiling_debugging_3.png b/docs/images/dynamic_profiling_debugging_3.png
new file mode 100644
index 0000000..5706d7e
Binary files /dev/null and b/docs/images/dynamic_profiling_debugging_3.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index ec42da6..cf1ece6 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -17,7 +17,7 @@
 (ns backtype.storm.cluster
   (:import [org.apache.zookeeper.data Stat ACL Id]
            [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
-            LogConfig]
+            LogConfig ProfileAction ProfileRequest NodeInfo]
            [java.io Serializable])
   (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
   (:import [org.apache.curator.framework.state ConnectionStateListener ConnectionState])
@@ -186,6 +186,10 @@
   (active-storms [this])
   (storm-base [this storm-id callback])
   (get-worker-heartbeat [this storm-id node port])
+  (get-worker-profile-requests [this storm-id nodeinfo thrift?])
+  (get-topology-profile-requests [this storm-id thrift?])
+  (set-worker-profile-request [this storm-id profile-request])
+  (delete-topology-profile-requests [this storm-id profile-request])
   (executor-beats [this storm-id executor->node+port])
   (supervisors [this callback])
   (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
@@ -228,6 +232,7 @@
 (def NIMBUSES-ROOT "nimbuses")
 (def CREDENTIALS-ROOT "credentials")
 (def LOGCONFIG-ROOT "logconfigs")
+(def PROFILERCONFIG-ROOT "profilerconfigs")
 
 (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
 (def STORMS-SUBTREE (str "/" STORMS-ROOT))
@@ -239,6 +244,7 @@
 (def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
 (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
 (def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
+(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT))
 
 (defn supervisor-path
   [id]
@@ -302,6 +308,12 @@
   [storm-id]
   (str LOGCONFIG-SUBTREE "/" storm-id))
 
+(defn profiler-config-path
+  ([storm-id]
+   (str PROFILERCONFIG-SUBTREE "/" storm-id))
+  ([storm-id host port request-type]
+   (str (profiler-config-path storm-id) "/" host "_" port "_" request-type)))
+
 (defn- issue-callback!
   [cb-atom]
   (let [cb @cb-atom]
@@ -497,6 +509,48 @@
         [this storm-id log-config]
         (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls))
 
+      (set-worker-profile-request
+        [this storm-id profile-request]
+        (let [request-type (.get_action profile-request)
+              host (.get_node (.get_nodeInfo profile-request))
+              port (first (.get_port (.get_nodeInfo profile-request)))]
+          (.set_data cluster-state
+                     (profiler-config-path storm-id host port request-type)
+                     (Utils/serialize profile-request)
+                     acls)))
+
+      (get-topology-profile-requests
+        [this storm-id thrift?]
+        (let [path (profiler-config-path storm-id)
+              requests (if (exists-node? cluster-state path false)
+                         (dofor [c (.get_children cluster-state path false)]
+                                (let [raw (.get_data cluster-state (str path "/" c) false)
+                                      request (maybe-deserialize raw ProfileRequest)]
+                                      (if thrift?
+                                        request
+                                        (clojurify-profile-request request)))))]
+          requests))
+
+      (delete-topology-profile-requests
+        [this storm-id profile-request]
+        (let [profile-request-inst (thriftify-profile-request profile-request)
+              action (:action profile-request)
+              host (:host profile-request)
+              port (:port profile-request)]
+          (.delete_node cluster-state
+           (profiler-config-path storm-id host port action))))
+          
+      (get-worker-profile-requests
+        [this storm-id node-info thrift?]
+        (let [host (:host node-info)
+              port (:port node-info)
+              profile-requests (get-topology-profile-requests this storm-id thrift?)]
+          (if thrift?
+            (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo  %)))))
+                    profile-requests)
+            (filter #(and (= host (:host %)) (= port (:port %)))
+                    profile-requests))))
+      
       (worker-heartbeat!
         [this storm-id node port info]
         (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
@@ -607,6 +661,7 @@
         (delete-node cluster-state (code-distributor-path storm-id))
         (delete-node cluster-state (credentials-path storm-id))
         (delete-node cluster-state (log-config-path storm-id))
+        (delete-node cluster-state (profiler-config-path storm-id))
         (remove-storm-base! this storm-id))
 
       (set-credentials!

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index f06f6e9..c9e151a 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -229,6 +229,10 @@
   ([conf id port]
    (str (worker-artifacts-root conf id) file-path-separator port)))
 
+(defn worker-artifacts-pid-path
+  [conf id port]
+  (str (worker-artifacts-root conf id port) file-path-separator "worker.pid"))
+
 (defn get-log-metadata-file
   ([fname]
     (let [[id port & _] (str/split fname (re-pattern file-path-separator))]

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
index 21920e4..52a1817 100644
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -16,7 +16,7 @@
 (ns backtype.storm.converter
   (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
             StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
-            TopologyActionOptions DebugOptions])
+            TopologyActionOptions DebugOptions ProfileRequest])
   (:use [backtype.storm util stats log])
   (:require [backtype.storm.daemon [common :as common]]))
 
@@ -249,6 +249,23 @@
     (.set_host (:host error))
     (.set_port (:port error))))
 
+(defn clojurify-profile-request
+  [^ProfileRequest request]
+  (when request
+    {:host (.get_node (.get_nodeInfo request))
+     :port (first (.get_port (.get_nodeInfo request)))
+     :action     (.get_action request)
+     :timestamp  (.get_time_stamp request)}))
+
+(defn thriftify-profile-request
+  [profile-request]
+  (let [nodeinfo (doto (NodeInfo.)
+                   (.set_node (:host profile-request))
+                   (.set_port (set [(:port profile-request)])))
+        request (ProfileRequest. nodeinfo (:action profile-request))]
+    (.set_time_stamp request (:timestamp profile-request))
+    request))
+
 (defn thriftify-credentials [credentials]
     (doto (Credentials.)
       (.set_creds (if credentials credentials {}))))

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 353a58e..5303a25 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -548,6 +548,51 @@ Note that if anything goes wrong, this will throw an Error and exit."
       (catch InvalidRequestException ex
         (log-error ex)
         (ring-response-from-exception ex))))
+  (GET "/dumps/:topo-id/:host-port/:filename"
+       [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port filename &m]
+     (let [port (second (split host-port #":"))]
+       (-> (resp/response (File. (str log-root
+                                      file-path-separator
+                                      topo-id
+                                      file-path-separator
+                                      port
+                                      file-path-separator
+                                      filename)))
+           (resp/content-type "application/octet-stream"))))
+  (GET "/dumps/:topo-id/:host-port"
+       [:as {:keys [servlet-request servlet-response log-root]} topo-id host-port &m]
+     (let [user (.getUserName http-creds-handler servlet-request)
+           port (second (split host-port #":"))
+           dir (File. (str log-root
+                           file-path-separator
+                           topo-id
+                           file-path-separator
+                           port))
+           files (filter (comp not nil?)
+                         (for [f (.listFiles dir)]
+                           (let [name (.getName f)]
+                             (if (or
+                                  (.endsWith name ".txt")
+                                  (.endsWith name ".jfr")
+                                  (.endsWith name ".bin"))
+                               (.getName f)))))]
+       (if (.exists dir)
+         (if (or (blank? (*STORM-CONF* UI-FILTER))
+               (authorized-log-user? user "worker.log" *STORM-CONF*))
+           (html4
+             [:head
+              [:title "File Dumps - Storm Log Viewer"]
+              (include-css "/css/bootstrap-3.3.1.min.css")
+              (include-css "/css/jquery.dataTables.1.10.4.min.css")
+              (include-css "/css/style.css")]
+             [:body
+              [:ul
+               (for [file files]
+                 [:li
+                  [:a {:href (str "/dumps/" topo-id "/" host-port "/" file)} file ]])]])
+           (unauthorized-user-html user))
+         (-> (resp/response "Page not found")
+           (resp/status 404)))))
   (GET "/daemonlog" [:as req & m]
     (try
       (let [servlet-request (:servlet-request req)

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index faf7cde..205c6f4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -20,7 +20,7 @@
   (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
   (:import [org.apache.commons.io FileUtils])
   (:import [java.nio ByteBuffer]
-           [java.util Collections HashMap]
+           [java.util Collections List HashMap]
            [backtype.storm.generated NimbusSummary])
   (:import [java.io FileNotFoundException File FileOutputStream])
   (:import [java.net InetAddress])
@@ -36,7 +36,8 @@
             ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
             KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
             ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice
-            ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction])
+            ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
+            ProfileRequest ProfileAction NodeInfo])
   (:import [backtype.storm.daemon Shutdownable])
   (:use [backtype.storm util config log timer zookeeper])
   (:require [backtype.storm [cluster :as cluster]
@@ -1335,6 +1336,36 @@
           (locking (:submit-lock nimbus)
             (.update-storm! storm-cluster-state storm-id storm-base-updates))))
 
+      (^void setWorkerProfiler
+        [this ^String id ^ProfileRequest profileRequest]
+        (let [topology-conf (try-read-storm-conf conf id)
+              storm-name (topology-conf TOPOLOGY-NAME)
+              _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
+              storm-cluster-state (:storm-cluster-state nimbus)]
+          (.set-worker-profile-request storm-cluster-state id profileRequest)))
+
+      (^List getComponentPendingProfileActions
+        [this ^String id ^String component_id ^ProfileAction action]
+        (let [info (get-common-topo-info id "getComponentPendingProfileActions")
+              storm-cluster-state (:storm-cluster-state info)
+              task->component (:task->component info)
+              {:keys [executor->node+port node->host]} (:assignment info)
+              executor->host+port (map-val (fn [[node port]]
+                                             [(node->host node) port])
+                                    executor->node+port)
+              nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
+              all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
+              latest-profile-actions (remove nil? (map (fn [nodeInfo]
+                                                         (->> all-pending-actions-for-topology
+                                                              (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %)))
+                                                                         (= (:port nodeInfo) (first (.get_port (.get_nodeInfo  %))))))
+                                                              (filter #(= action (.get_action %)))
+                                                              (sort-by #(.get_time_stamp %) >)
+                                                              first))
+                                                    nodeinfos))]
+          (log-message "Latest profile actions for topology " id " component " component_id " " (pr-str latest-profile-actions))
+          latest-profile-actions))
+
       (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
         (let [topology-conf (try-read-storm-conf conf id)
               storm-name (topology-conf TOPOLOGY-NAME)

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 52b2057..8fe6eed 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -26,7 +26,7 @@
   (:use [backtype.storm config util log timer local-state])
   (:import [backtype.storm.utils VersionInfo])
   (:import [backtype.storm Config])
-  (:import [backtype.storm.generated WorkerResources])
+  (:import [backtype.storm.generated WorkerResources ProfileAction])
   (:use [backtype.storm.daemon common])
   (:require [backtype.storm.daemon [worker :as worker]]
             [backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
@@ -34,6 +34,7 @@
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
   (:import [org.yaml.snakeyaml Yaml]
            [org.yaml.snakeyaml.constructor SafeConstructor])
+  (:import [java.util Date])
   (:gen-class
     :methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))
 
@@ -59,9 +60,16 @@
                         {sid (.assignment-info-with-version storm-cluster-state sid callback)})
                       {sid nil})))
            (apply merge)
-           (filter-val not-nil?))]
-
+           (filter-val not-nil?))
+          new-profiler-actions
+          (->>
+            (dofor [sid (distinct storm-ids)]
+                   (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)]
+                      {sid topo-profile-actions}))
+           (apply merge))]
+         
       {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
+       :profiler-actions new-profiler-actions
        :versions new-assignments})))
 
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
@@ -318,6 +326,7 @@
    :sync-retry (atom 0)
    :code-distributor (mk-code-distributor conf)
    :download-lock (Object.)
+   :stormid->profiler-actions (atom {})
    })
 
 (defn sync-processes [supervisor]
@@ -449,9 +458,10 @@
           ^LocalState local-state (:local-state supervisor)
           sync-callback (fn [& ignored] (.add event-manager this))
           assignment-versions @(:assignment-versions supervisor)
-          {assignments-snapshot :assignments versions :versions}  (assignments-snapshot
-                                                                   storm-cluster-state sync-callback
-                                                                  assignment-versions)
+          {assignments-snapshot :assignments
+           storm-id->profiler-actions :profiler-actions
+           versions :versions}
+          (assignments-snapshot storm-cluster-state sync-callback assignment-versions)
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
           existing-assignment (ls-local-assignments local-state)
@@ -467,6 +477,7 @@
       (log-debug "Downloaded storm ids: " downloaded-storm-ids)
       (log-debug "All assignment: " all-assignment)
       (log-debug "New assignment: " new-assignment)
+      (log-debug "Storm Ids Profiler Actions" storm-id->profiler-actions)
 
       ;; download code first
       ;; This might take awhile
@@ -486,6 +497,8 @@
       (ls-local-assignments! local-state
             new-assignment)
       (reset! (:assignment-versions supervisor) versions)
+      (reset! (:stormid->profiler-actions supervisor) storm-id->profiler-actions)
+
       (reset! (:curr-assignment supervisor) new-assignment)
       ;; remove any downloaded code that's no longer assigned or active
       ;; important that this happens after setting the local assignment so that
@@ -508,6 +521,121 @@
   {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
    Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
 
+(defn jvm-cmd [cmd]
+  (let [java-home (.get (System/getenv) "JAVA_HOME")]
+    (if (nil? java-home)
+      cmd
+      (str java-home file-path-separator "bin" file-path-separator cmd))))
+
+(defn java-cmd []
+  (jvm-cmd "java"))
+
+(def PROFILE-CMD "flight.bash")
+
+(defn jmap-dump-cmd [pid target-dir]
+  [PROFILE-CMD pid "jmap" target-dir])
+
+(defn jstack-dump-cmd [pid target-dir]
+  [PROFILE-CMD pid "jstack" target-dir])
+
+(defn jprofile-start [pid]
+  [PROFILE-CMD pid "start" ])
+
+(defn jprofile-stop [pid target-dir]
+  [PROFILE-CMD pid "stop" target-dir])
+
+(defn jprofile-dump [pid workers-artifacts-directory]
+  [PROFILE-CMD pid "dump" workers-artifacts-directory])
+
+(defn jprofile-jvm-restart [pid]
+  [PROFILE-CMD pid "kill" ])
+
+(defn- delete-topology-profiler-action [storm-cluster-state storm-id profile-action]
+  (log-message "Deleting profiler action.." profile-action)
+  (.delete-topology-profile-requests storm-cluster-state storm-id profile-action))
+
+(defnk launch-profiler-action-for-worker
+  "Launch profiler action for a worker"
+  [conf user target-dir command :environment {} :exit-code-on-profile-action nil :log-prefix nil]
+  (if-let [run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)]
+    (let [container-file (container-file-path target-dir)
+          script-file (script-file-path target-dir)]
+      (log-message "Running as user:" user " command:" (shell-cmd command))
+      (if (exists-file? container-file) (rmr-as-user conf container-file container-file))
+      (if (exists-file? script-file) (rmr-as-user conf script-file script-file))
+      (worker-launcher
+        conf
+        user
+        ["profiler" target-dir (write-script target-dir command :environment environment)]
+        :log-prefix log-prefix
+        :exit-code-callback exit-code-on-profile-action
+        :directory (File. target-dir)))
+    (launch-process
+      command
+      :environment environment
+      :log-prefix log-prefix
+      :exit-code-callback exit-code-on-profile-action
+      :directory (File. target-dir))))
+
+(defn mk-run-profiler-actions-for-all-topologies
+  "Returns a function that downloads all profile-actions listed for all topologies assigned
+  to this supervisor, executes those actions as user and deletes them from zookeeper."
+  [supervisor]
+  (fn []
+    (try
+      (let [conf (:conf supervisor)
+            stormid->profiler-actions @(:stormid->profiler-actions supervisor)
+            storm-cluster-state (:storm-cluster-state supervisor)
+            hostname (:my-hostname supervisor)
+            new-assignment @(:curr-assignment supervisor)
+            assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+        (doseq [[storm-id profiler-actions] stormid->profiler-actions]
+          (when (not (empty? profiler-actions))
+            (doseq [pro-action profiler-actions]
+              (if (= hostname (:host pro-action))
+                (let [port (:port pro-action)
+                      action ^ProfileAction (:action pro-action)
+                      stop? (> (System/currentTimeMillis) (:timestamp pro-action))
+                      target-dir (worker-artifacts-root conf storm-id port)
+                      storm-conf (read-supervisor-storm-conf conf storm-id)
+                      user (storm-conf TOPOLOGY-SUBMITTER-USER)
+                      environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)] env {})
+                      worker-pid (slurp (worker-artifacts-pid-path conf storm-id port))
+                      log-prefix (str "ProfilerAction process " storm-id ":" port " PROFILER_ACTION: " action " ")
+                      ;; Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted
+                      ;; The profiler plugin script validates if JVM is recording before starting another recording.
+                      command (cond
+                                (= action ProfileAction/JMAP_DUMP) (jmap-dump-cmd worker-pid target-dir)
+                                (= action ProfileAction/JSTACK_DUMP) (jstack-dump-cmd worker-pid target-dir)
+                                (= action ProfileAction/JPROFILE_DUMP) (jprofile-dump worker-pid target-dir)
+                                (= action ProfileAction/JVM_RESTART) (jprofile-jvm-restart worker-pid)
+                                (and (not stop?)
+                                     (= action ProfileAction/JPROFILE_STOP))
+                                  (jprofile-start worker-pid) ;; Ensure the profiler is still running
+                                (and stop? (= action ProfileAction/JPROFILE_STOP)) (jprofile-stop worker-pid target-dir))
+                      action-on-exit (fn [exit-code]
+                                       (log-message log-prefix " profile-action exited for code: " exit-code)
+                                       (if (and (= exit-code 0) stop?)
+                                         (delete-topology-profiler-action storm-cluster-state storm-id pro-action)))
+                      command (->> command (map str) (filter (complement empty?)))]
+
+                  (try
+                    (launch-profiler-action-for-worker conf
+                      user
+                      target-dir
+                      command
+                      :environment environment
+                      :exit-code-on-profile-action action-on-exit
+                      :log-prefix log-prefix)
+                    (catch IOException ioe
+                      (log-error ioe
+                        (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later.")))
+                    (catch RuntimeException rte
+                      (log-error rte
+                        (str "Error in processing ProfilerAction '" action "' for " storm-id ":" port ", will retry later."))))))))))
+      (catch Exception e
+        (log-error e "Error running profiler actions, will retry again later")))))
+
 ;; in local state, supervisor stores who its current assignments are
 ;; another thread launches events to restart any dead processes if necessary
 (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
@@ -518,6 +646,7 @@
         [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
         sync-processes (partial sync-processes supervisor)
         synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
+        run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor)
         heartbeat-fn (fn [] (.supervisor-heartbeat!
                                (:storm-cluster-state supervisor)
                                (:supervisor-id supervisor)
@@ -545,7 +674,12 @@
       (schedule-recurring (:event-timer supervisor)
                           0
                           (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-                          (fn [] (.add processes-event-manager sync-processes))))
+                          (fn [] (.add processes-event-manager sync-processes)))
+      ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
+      (schedule-recurring (:event-timer supervisor)
+                          30
+                          30
+                          (fn [] (.add event-manager run-profiler-actions-fn))))
     (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
     (reify
      Shutdownable
@@ -662,12 +796,6 @@
       (sequential? value) (vec (map sub-fn value))
       :else (-> value sub-fn (clojure.string/split #"\s+")))))
 
-(defn java-cmd []
-  (let [java-home (.get (System/getenv) "JAVA_HOME")]
-    (if (nil? java-home)
-      "java"
-      (str java-home file-path-separator "bin" file-path-separator "java")
-      )))
 
 (defn create-artifacts-link
   "Create a symlink from workder directory to its port artifacts directory"
@@ -717,6 +845,7 @@
                              (substitute-childopts s worker-id storm-id port mem-onheap))
           topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
                                   (substitute-childopts s worker-id storm-id port mem-onheap))
+          worker--profiler-childopts (substitute-childopts (conf WORKER-PROFILER-CHILDOPTS) worker-id storm-id port mem-onheap)
           topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)]
                                         (merge env {"LD_LIBRARY_PATH" jlp})
                                         {"LD_LIBRARY_PATH" jlp})
@@ -737,6 +866,7 @@
                     worker-childopts
                     topo-worker-childopts
                     gc-opts
+                    worker--profiler-childopts
                     [(str "-Djava.library.path=" jlp)
                      (str "-Dlogfile.name=" logfilename)
                      (str "-Dstorm.home=" storm-home)

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index f304bdd..2fd316c 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -538,7 +538,9 @@
   ;; because in local mode, its not a separate
   ;; process. supervisor will register it in this case
   (when (= :distributed (cluster-mode conf))
-    (touch (worker-pid-path conf worker-id (process-pid))))
+    (let [pid (process-pid)]
+      (touch (worker-pid-path conf worker-id pid))
+      (spit (worker-artifacts-pid-path conf storm-id port) pid)))
 
   (declare establish-log-setting-callback)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index 16a00ec..b73fd8b 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -810,6 +810,15 @@
       (.containsKey bolts id) :bolt
       (.containsKey spouts id) :spout)))
 
+(defn extract-nodeinfos-from-hb-for-comp
+  ([exec->host+port task->component include-sys? comp-id]
+   (distinct (for [[[start end :as executor] [host port]] exec->host+port
+         :let [id (task->component start)]
+         :when (and (or (nil? comp-id) (= comp-id id))
+                 (or include-sys? (not (Utils/isSystemId id))))]
+     {:host host
+      :port port}))))
+
 (defn extract-data-from-hb
   ([exec->host+port task->component beats include-sys? topology comp-id]
    (for [[[start end :as executor] [host port]] exec->host+port

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 0e12210..c6ffe22 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -21,13 +21,13 @@
         ring.middleware.multipart-params)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
-  (:use [backtype.storm config util log stats tuple zookeeper])
+  (:use [backtype.storm config util log stats tuple zookeeper converter])
   (:use [backtype.storm.ui helpers])
   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                               ACKER-FAIL-STREAM-ID mk-authorization-handler]]])
-  (:use [clojure.string :only [blank? lower-case trim]])
   (:import [backtype.storm.utils Utils]
            [backtype.storm.generated NimbusSummary])
+  (:use [clojure.string :only [blank? lower-case trim split]])
   (:import [backtype.storm.generated ExecutorSpecificStats
             ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
             ErrorInfo ClusterSummary SupervisorSummary TopologySummary
@@ -38,7 +38,7 @@
             ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
             LogConfig LogLevel LogLevelAction])
   (:import [backtype.storm.security.auth AuthUtils ReqContext])
-  (:import [backtype.storm.generated AuthorizationException])
+  (:import [backtype.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
   (:import [backtype.storm.security.auth AuthUtils])
   (:import [backtype.storm.utils VersionInfo])
   (:import [java.io File])
@@ -159,6 +159,13 @@
     (.get_error_time_secs ^ErrorInfo error)
     ""))
 
+(defn worker-dump-link [host port topology-id]
+  (url-format "http://%s:%s/dumps/%s/%s"
+              (url-encode host)
+              (*STORM-CONF* LOGVIEWER-PORT)
+              (url-encode topology-id)
+              (str (url-encode host) ":" (url-encode port))))
+
 (defn stats-times
   [stats-map]
   (sort-by #(Integer/parseInt %)
@@ -734,6 +741,22 @@
                           (.get_exec_stats info))}
     (-> info .get_errors (component-errors topology-id secure?))))
 
+(defn get-active-profile-actions
+  [nimbus topology-id component]
+  (let [profile-actions  (.getComponentPendingProfileActions nimbus
+                                               topology-id
+                                               component
+                                 ProfileAction/JPROFILE_STOP)
+        latest-profile-actions (map clojurify-profile-request profile-actions)
+        active-actions (map (fn [profile-action]
+                              {"host" (:host profile-action)
+                               "port" (str (:port profile-action))
+                               "dumplink" (worker-dump-link (:host profile-action) (str (:port profile-action)) topology-id)
+                               "timestamp" (str (- (:timestamp profile-action) (System/currentTimeMillis)))})
+                            latest-profile-actions)]
+    (log-message "Latest-active actions are: " (pr active-actions))
+    active-actions))
+
 (defn component-page
   [topology-id component window include-sys? user secure?]
   (thrift/with-configured-nimbus-connection nimbus
@@ -775,8 +798,9 @@
                                       component
                                       (.get_eventlog_host comp-page-info)
                                       (.get_eventlog_port comp-page-info)
-                                      secure?)))))
-
+                                      secure?)
+       "profilerActive" (get-active-profile-actions nimbus topology-id component)))))
+    
 (defn- level-to-dict [level]
   (if level
     (let [timeout (.get_reset_log_level_timeout_secs level)
@@ -986,6 +1010,121 @@
         (log-message "Setting topology " id " log config " new-log-config)
         (.setLogConfig nimbus id new-log-config)
         (json-response (log-config id) (m "callback")))))
+
+  (GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout"
+       [:as {:keys [servlet-request]} id host-port timeout & m]
+       (thrift/with-configured-nimbus-connection nimbus
+         (let [user (.getUserName http-creds-handler servlet-request)
+               topology-conf (from-json
+                              (.getTopologyConf ^Nimbus$Client nimbus id))]
+           (assert-authorized-user "setWorkerProfiler" (topology-config id)))
+
+         (let [[host, port] (split host-port #":")
+               nodeinfo (NodeInfo. host (set [(Long. port)]))
+               timestamp (+ (System/currentTimeMillis) (* 60000 (Long. timeout)))
+               request (ProfileRequest. nodeinfo
+                                        ProfileAction/JPROFILE_STOP)]
+           (.set_time_stamp request timestamp)
+           (.setWorkerProfiler nimbus id request)
+           (json-response {"status" "ok"
+                           "id" host-port
+                           "timeout" timeout
+                           "dumplink" (worker-dump-link
+                                       host
+                                       port
+                                       id)}
+                          (m "callback")))))
+
+  (GET "/api/v1/topology/:id/profiling/stop/:host-port"
+       [:as {:keys [servlet-request]} id host-port & m]
+       (thrift/with-configured-nimbus-connection nimbus
+         (let [user (.getUserName http-creds-handler servlet-request)
+               topology-conf (from-json
+                              (.getTopologyConf ^Nimbus$Client nimbus id))]
+           (assert-authorized-user "setWorkerProfiler" (topology-config id)))
+         (let [[host, port] (split host-port #":")
+               nodeinfo (NodeInfo. host (set [(Long. port)]))
+               timestamp 0
+               request (ProfileRequest. nodeinfo
+                                        ProfileAction/JPROFILE_STOP)]
+           (.set_time_stamp request timestamp)
+           (.setWorkerProfiler nimbus id request)
+           (json-response {"status" "ok"
+                           "id" host-port}
+                          (m "callback")))))
+  
+  (GET "/api/v1/topology/:id/profiling/dumpprofile/:host-port"
+       [:as {:keys [servlet-request]} id host-port & m]
+       (thrift/with-configured-nimbus-connection nimbus
+         (let [user (.getUserName http-creds-handler servlet-request)
+               topology-conf (from-json
+                              (.getTopologyConf ^Nimbus$Client nimbus id))]
+           (assert-authorized-user "setWorkerProfiler" (topology-config id)))
+         (let [[host, port] (split host-port #":")
+               nodeinfo (NodeInfo. host (set [(Long. port)]))
+               timestamp (System/currentTimeMillis)
+               request (ProfileRequest. nodeinfo
+                                        ProfileAction/JPROFILE_DUMP)]
+           (.set_time_stamp request timestamp)
+           (.setWorkerProfiler nimbus id request)
+           (json-response {"status" "ok"
+                           "id" host-port}
+                          (m "callback")))))
+
+  (GET "/api/v1/topology/:id/profiling/dumpjstack/:host-port"
+       [:as {:keys [servlet-request]} id host-port & m]
+       (thrift/with-configured-nimbus-connection nimbus
+         (let [user (.getUserName http-creds-handler servlet-request)
+               topology-conf (from-json
+                              (.getTopologyConf ^Nimbus$Client nimbus id))]
+           (assert-authorized-user "setWorkerProfiler" (topology-config id)))
+         (let [[host, port] (split host-port #":")
+               nodeinfo (NodeInfo. host (set [(Long. port)]))
+               timestamp (System/currentTimeMillis)
+               request (ProfileRequest. nodeinfo
+                                        ProfileAction/JSTACK_DUMP)]
+           (.set_time_stamp request timestamp)
+           (.setWorkerProfiler nimbus id request)
+           (json-response {"status" "ok"
+                           "id" host-port}
+                          (m "callback")))))
+
+  (GET "/api/v1/topology/:id/profiling/restartworker/:host-port"
+       [:as {:keys [servlet-request]} id host-port & m]
+       (thrift/with-configured-nimbus-connection nimbus
+         (let [user (.getUserName http-creds-handler servlet-request)
+               topology-conf (from-json
+                              (.getTopologyConf ^Nimbus$Client nimbus id))]
+           (assert-authorized-user "setWorkerProfiler" (topology-config id)))
+         (let [[host, port] (split host-port #":")
+               nodeinfo (NodeInfo. host (set [(Long. port)]))
+               timestamp (System/currentTimeMillis)
+               request (ProfileRequest. nodeinfo
+                                        ProfileAction/JVM_RESTART)]
+           (.set_time_stamp request timestamp)
+           (.setWorkerProfiler nimbus id request)
+           (json-response {"status" "ok"
+                           "id" host-port}
+                          (m "callback")))))
+       
+  (GET "/api/v1/topology/:id/profiling/dumpheap/:host-port"
+       [:as {:keys [servlet-request]} id host-port & m]
+       (thrift/with-configured-nimbus-connection nimbus
+         (let [user (.getUserName http-creds-handler servlet-request)
+               topology-conf (from-json
+                              (.getTopologyConf ^Nimbus$Client nimbus id))]
+           (assert-authorized-user "setWorkerProfiler" (topology-config id)))
+         (let [[host, port] (split host-port #":")
+               nodeinfo (NodeInfo. host (set [(Long. port)]))
+               timestamp (System/currentTimeMillis)
+               request (ProfileRequest. nodeinfo
+                                        ProfileAction/JMAP_DUMP)]
+           (.set_time_stamp request timestamp)
+           (.setWorkerProfiler nimbus id request)
+           (json-response {"status" "ok"
+                           "id" host-port}
+                          (m "callback")))))
+  
   (GET "/" [:as {cookies :cookies}]
     (resp/redirect "/index.html"))
   (route/resources "/")

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 9ec8cd3..dc8ba2d 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -517,11 +517,17 @@
     (map #(str \' (clojure.string/escape % {\' "'\"'\"'"}) \'))
       (clojure.string/join " ")))
 
+(defn script-file-path [dir]
+  (str dir file-path-separator "storm-worker-script.sh"))
+
+(defn container-file-path [dir]
+  (str dir file-path-separator "launch_container.sh"))
+
 (defnk write-script
   [dir command :environment {}]
   (let [script-src (str "#!/bin/bash\n" (clojure.string/join "" (map (fn [[k v]] (str (shell-cmd ["export" (str k "=" v)]) ";\n")) environment)) "\nexec " (shell-cmd command) ";")
-        script-path (str dir "/storm-worker-script.sh")
-        - (spit script-path script-src)]
+        script-path (script-file-path dir)
+        _ (spit script-path script-src)]
     script-path
   ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 73e41cb..303a3a0 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1087,6 +1087,12 @@ public class Config extends HashMap<String, Object> {
     public static final String WORKER_HEAP_MEMORY_MB = "worker.heap.memory.mb";
 
     /**
+     * The jvm profiler opts provided to workers launched by this supervisor.
+     */
+    @isStringOrStringList
+    public static final String WORKER_PROFILER_CHILDOPTS = "worker.profiler.childopts";
+
+    /**
      * The jvm opts provided to workers launched by this supervisor for GC. All "%ID%" substrings are replaced
      * with an identifier for this worker.  Because the JVM complains about multiple GC opts the topology
      * can override this default value by setting topology.worker.gc.childopts.


Mime
View raw message