storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [01/12] git commit: windows port
Date Thu, 19 Dec 2013 06:25:13 GMT
Updated Branches:
  refs/heads/master de291341e -> ebd1181ab


windows port


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/6537459c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/6537459c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/6537459c

Branch: refs/heads/master
Commit: 6537459cb816fc3de58f04f2b855cd2fecd9c06c
Parents: d12c335
Author: David Lao <davidlao@microsoft.com>
Authored: Fri Jun 21 11:08:33 2013 -0700
Committer: David Lao <davidlao@microsoft.com>
Committed: Fri Jun 21 11:08:33 2013 -0700

----------------------------------------------------------------------
 project.clj                                  |  2 +
 src/clj/backtype/storm/config.clj            | 46 +++++++++++------------
 src/clj/backtype/storm/daemon/supervisor.clj | 25 +++++++++---
 src/clj/backtype/storm/testing.clj           |  6 ++-
 src/clj/backtype/storm/ui/core.clj           | 27 ++++++++++++-
 src/clj/backtype/storm/util.clj              | 18 +++++++--
 src/clj/backtype/storm/zookeeper.clj         |  4 +-
 7 files changed, 90 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/project.clj
----------------------------------------------------------------------
diff --git a/project.clj b/project.clj
index 7f59387..74c874d 100644
--- a/project.clj
+++ b/project.clj
@@ -10,6 +10,8 @@
                  [clj-time "0.4.1"]
                  [com.netflix.curator/curator-framework "1.0.1"
                   :exclusions [log4j/log4j]]
+                 [org.apache.zookeeper/zookeeper "3.4.5"
+                  :exclusions [com.sun.jmx/jmxri com.sun.jdmk/jmxtools javax.jms/jms log4j/log4j
jline org.slf4j/slf4j-log4j12]]
                  [backtype/jzmq "2.1.0"]
                  [com.googlecode.json-simple/json-simple "1.1"]
                  [compojure "1.1.3"]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/src/clj/backtype/storm/config.clj b/src/clj/backtype/storm/config.clj
index 4614e17..932770d 100644
--- a/src/clj/backtype/storm/config.clj
+++ b/src/clj/backtype/storm/config.clj
@@ -69,67 +69,67 @@
   (clojurify-structure (Utils/findAndReadConfigFile name true)))
 
 (defn master-local-dir [conf]
-  (let [ret (str (conf STORM-LOCAL-DIR) "/nimbus")]
+  (let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "nimbus")]
     (FileUtils/forceMkdir (File. ret))
     ret
     ))
 
 (defn master-stormdist-root
   ([conf]
-     (str (master-local-dir conf) "/stormdist"))
+     (str (master-local-dir conf) file-path-separator "stormdist"))
   ([conf storm-id]
-     (str (master-stormdist-root conf) "/" storm-id)))
+     (str (master-stormdist-root conf) file-path-separator storm-id)))
 
 (defn master-stormjar-path [stormroot]
-  (str stormroot "/stormjar.jar"))
+  (str stormroot file-path-separator "stormjar.jar"))
 
 (defn master-stormcode-path [stormroot]
-  (str stormroot "/stormcode.ser"))
+  (str stormroot file-path-separator "stormcode.ser"))
 
 (defn master-stormconf-path [stormroot]
-  (str stormroot "/stormconf.ser"))
+  (str stormroot file-path-separator "stormconf.ser"))
 
 (defn master-inbox [conf]
-  (let [ret (str (master-local-dir conf) "/inbox")]
+  (let [ret (str (master-local-dir conf) file-path-separator "inbox")]
     (FileUtils/forceMkdir (File. ret))
     ret ))
 
 (defn master-inimbus-dir [conf]
-  (str (master-local-dir conf) "/inimbus"))
+  (str (master-local-dir conf) file-path-separator "inimbus"))
 
 (defn supervisor-local-dir [conf]
-  (let [ret (str (conf STORM-LOCAL-DIR) "/supervisor")]
+  (let [ret (str (conf STORM-LOCAL-DIR) file-path-separator "supervisor")]
     (FileUtils/forceMkdir (File. ret))
     ret
     ))
 
 (defn supervisor-isupervisor-dir [conf]
-  (str (supervisor-local-dir conf) "/isupervisor"))
+  (str (supervisor-local-dir conf) file-path-separator "isupervisor"))
 
 (defn supervisor-stormdist-root
-  ([conf] (str (supervisor-local-dir conf) "/stormdist"))
+  ([conf] (str (supervisor-local-dir conf) file-path-separator "stormdist"))
   ([conf storm-id]
-      (str (supervisor-stormdist-root conf) "/" (java.net.URLEncoder/encode storm-id))))
+      (str (supervisor-stormdist-root conf) file-path-separator (java.net.URLEncoder/encode
storm-id))))
 
 (defn supervisor-stormjar-path [stormroot]
-  (str stormroot "/stormjar.jar"))
+  (str stormroot file-path-separator "stormjar.jar"))
 
 (defn supervisor-stormcode-path [stormroot]
-  (str stormroot "/stormcode.ser"))
+  (str stormroot file-path-separator "stormcode.ser"))
 
 (defn supervisor-stormconf-path [stormroot]
-  (str stormroot "/stormconf.ser"))
+  (str stormroot file-path-separator "stormconf.ser"))
 
 (defn supervisor-tmp-dir [conf]
-  (let [ret (str (supervisor-local-dir conf) "/tmp")]
+  (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")]
     (FileUtils/forceMkdir (File. ret))
     ret ))
 
 (defn supervisor-storm-resources-path [stormroot]
-  (str stormroot "/" RESOURCES-SUBDIR))
+  (str stormroot file-path-separator RESOURCES-SUBDIR))
 
 (defn ^LocalState supervisor-state [conf]
-  (LocalState. (str (supervisor-local-dir conf) "/localstate")))
+  (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate")))
 
 (defn read-supervisor-storm-conf [conf storm-id]
   (let [stormroot (supervisor-stormdist-root conf storm-id)
@@ -146,20 +146,20 @@
 
 (defn worker-root
   ([conf]
-     (str (conf STORM-LOCAL-DIR) "/workers"))
+     (str (conf STORM-LOCAL-DIR) file-path-separator "workers"))
   ([conf id]
-     (str (worker-root conf) "/" id)))
+     (str (worker-root conf) file-path-separator id)))
 
 (defn worker-pids-root
   [conf id]
-  (str (worker-root conf id) "/pids"))
+  (str (worker-root conf id) file-path-separator "pids"))
 
 (defn worker-pid-path [conf id pid]
-  (str (worker-pids-root conf id) "/" pid))
+  (str (worker-pids-root conf id) file-path-separator pid))
 
 (defn worker-heartbeats-root
   [conf id]
-  (str (worker-root conf id) "/heartbeats"))
+  (str (worker-root conf id) file-path-separator "heartbeats"))
 
 ;; workers heartbeat here with pid and timestamp
 ;; if supervisor stops receiving heartbeat, it kills and restarts the process

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj
index fda038f..40a5574 100644
--- a/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/src/clj/backtype/storm/daemon/supervisor.clj
@@ -149,7 +149,10 @@
     (rmpath (worker-root conf id))
   (catch RuntimeException e
     (log-warn-error e "Failed to cleanup worker " id ". Will retry later")
-    )))
+    )
+  (catch java.io.FileNotFoundException e (log-message (.getMessage e)))
+  (catch java.io.IOException e (log-message (.getMessage e)))
+    ))
 
 (defn shutdown-worker [supervisor id]
   (log-message "Shutting down " (:supervisor-id supervisor) ":" id)
@@ -160,7 +163,9 @@
       (psim/kill-process thread-pid))
     (doseq [pid pids]
       (ensure-process-killed! pid)
-      (rmpath (worker-pid-path conf id pid))
+      (try
+        (rmpath (worker-pid-path conf id pid))
+        (catch Exception e)) ;; on windows, the supervisor may still holds the lock on the
worker directory
       )
     (try-cleanup-worker conf id))
   (log-message "Shut down " (:supervisor-id supervisor) ":" id))
@@ -258,6 +263,13 @@
        (map :storm-id)
        set))
 
+(defn try-shutdown-workers [supervisor]
+  (let [worker-root (worker-root (:conf supervisor))
+         ids (read-dir-contents worker-root)]
+    (doseq [id ids]
+      (shutdown-worker supervisor id))
+    ))
+
 (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
   (fn this []
     (let [conf (:conf supervisor)
@@ -317,7 +329,10 @@
         (when-not (assigned-storm-ids storm-id)
           (log-message "Removing code for storm id "
                        storm-id)
-          (rmr (supervisor-stormdist-root conf storm-id))
+          (try
+            (if on-windows? (try-shutdown-workers supervisor))
+            (rmr (supervisor-stormdist-root conf storm-id))
+            (catch Exception e (log-message (.getMessage e))))
           ))
       (.add processes-event-manager sync-processes)
       )))
@@ -394,7 +409,7 @@
 (defmethod download-storm-code
     :distributed [conf storm-id master-code-dir]
     ;; Downloading to permanent location is atomic
-    (let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid))
+    (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
           stormroot (supervisor-stormdist-root conf storm-id)]
       (FileUtils/forceMkdir (File. tmproot))
       
@@ -444,7 +459,7 @@
       (let [classloader (.getContextClassLoader (Thread/currentThread))
             resources-jar (resources-jar)
             url (.getResource classloader RESOURCES-SUBDIR)
-            target-dir (str stormroot "/" RESOURCES-SUBDIR)]
+            target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
             (cond
               resources-jar
               (do

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/src/clj/backtype/storm/testing.clj b/src/clj/backtype/storm/testing.clj
index 563351b..4e3c8a9 100644
--- a/src/clj/backtype/storm/testing.clj
+++ b/src/clj/backtype/storm/testing.clj
@@ -32,7 +32,7 @@
   (FeederSpout. (Fields. fields)))
 
 (defn local-temp-path []
-  (str (System/getProperty "java.io.tmpdir") "/" (uuid)))
+  (str (System/getProperty "java.io.tmpdir") (if-not on-windows? "/") (uuid)))
 
 (defn delete-all [paths]
   (dorun
@@ -161,7 +161,9 @@
   (log-message "Done shutting down in process zookeeper")
   (doseq [t @(:tmp-dirs cluster-map)]
     (log-message "Deleting temporary path " t)
-    (rmr t)
+    (try
+      (rmr t)
+      (catch Exception e (log-message (.getMessage e)))) ;; on windows, the host process
still holds lock on the logfile
     ))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/src/clj/backtype/storm/ui/core.clj b/src/clj/backtype/storm/ui/core.clj
index ceb54fd..b3b9932 100644
--- a/src/clj/backtype/storm/ui/core.clj
+++ b/src/clj/backtype/storm/ui/core.clj
@@ -6,6 +6,7 @@
   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]])
   (:use [ring.adapter.jetty :only [run-jetty]])
   (:use [clojure.string :only [trim]])
+  (:use [clojure.java.shell :only [sh]])
   (:import [backtype.storm.generated ExecutorSpecificStats
             ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
             ErrorInfo ClusterSummary SupervisorSummary TopologySummary
@@ -733,9 +734,11 @@
            ui-template))
   (GET "/topology/:id" [:as {cookies :cookies} id & m]
        (let [include-sys? (get-include-sys? cookies)]
-         (-> (topology-page id (:window m) include-sys?)
+         (try
+           (-> (topology-page id (:window m) include-sys?)
              (concat [(mk-system-toggle-button include-sys?)])
-             ui-template)))
+             ui-template)
+           (catch Exception e (resp/redirect "/")))))
   (GET "/topology/:id/component/:component" [:as {cookies :cookies} id component & m]
        (let [include-sys? (get-include-sys? cookies)]
          (-> (component-page id component (:window m) include-sys?)
@@ -773,7 +776,27 @@
         (.killTopologyWithOpts nimbus name options)
         (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
     (resp/redirect (str "/topology/" id)))
+  (ring.middleware.multipart-params/wrap-multipart-params
+    (POST "/upload" [:as {params :params} :as {headers :headers}]
+      (let [file-params (params :file)
+            src (.getCanonicalPath (file-params :tempfile))
+            dest (if-not (nil? name) (file-params :filename) name)]
+        (if on-windows? (sh "cmd" "/c" "move" "/y" src dest)
+          (sh "mv" "-f" src dest)))
+        (resp/redirect (get headers "referer"))))
+  (POST "/submit/:jar/:class/:args" [:as {params :params} :as {headers :headers} jar class
args]
+    (let [cmd (str (if on-windows? "bin/storm.cmd" "bin/storm")
+                     " jar " (params :jar) " " (params :class) " "
+                     (clojure.string/replace (params :args) #"&" " "))]
+      (exec-command! cmd)
+      (resp/redirect (get headers "referer"))))
+  (POST "/dir/:args" [:as {cookies :cookies} args]
+    (let [jars-str (if on-windows? (:out (sh "cmd" "/c" "dir" "/b" args))
+                      (:out (sh "ls" args)))
+          jars (clojure.string/split jars-str (if on-windows? #"\r\n" #"\n"))]
+        jars-str))
   (route/resources "/")
+  (route/files "/" {:root (str (System/getProperty "storm.home") file-path-separator "webapp")
})
   (route/not-found "Page not found"))
 
 (defn exception->html [ex]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/src/clj/backtype/storm/util.clj b/src/clj/backtype/storm/util.clj
index af11137..d79dce8 100644
--- a/src/clj/backtype/storm/util.clj
+++ b/src/clj/backtype/storm/util.clj
@@ -22,6 +22,15 @@
   (:use [backtype.storm log])
   )
 
+(def on-windows?
+  (= "Windows_NT" (System/getenv "OS")))
+
+(def file-path-separator
+  (System/getProperty "file.separator"))
+
+(def class-path-separator
+  (System/getProperty "path.separator"))
+
 (defmacro defalias
   "Defines an alias for a var: a new var with the same root binding (if
   any) and similar metadata. The metadata of the alias is its initial
@@ -334,7 +343,7 @@
 (defn ensure-process-killed! [pid]
   ;; TODO: should probably do a ps ax of some sort to make sure it was killed
   (try-cause
-    (exec-command! (str "kill -9 " pid))
+    (exec-command! (str (if on-windows? "taskkill /f /pid " "kill -9 ") pid))
   (catch ExecuteException e
     (log-message "Error when trying to kill " pid ". Process is probably already dead."))
     ))
@@ -429,7 +438,8 @@
 
 (defn touch [path]
   (log-debug "Touching file at " path)
-  (let [success? (.createNewFile (File. path))]
+  (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path))))
+                     (.createNewFile (File. path)))]
     (when-not success?
       (throw (RuntimeException. (str "Failed to touch " path))))
     ))
@@ -447,7 +457,7 @@
   (System/getProperty "java.class.path"))
 
 (defn add-to-classpath [classpath paths]
-  (str/join ":" (cons classpath paths)))
+  (str/join class-path-separator (cons classpath paths)))
 
 (defn ^ReentrantReadWriteLock mk-rw-lock []
   (ReentrantReadWriteLock.))
@@ -695,7 +705,7 @@
 
 (defn zip-contains-dir? [zipfile target]
   (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))]
-    (some? #(.startsWith % (str target "/")) entries)
+    (some? #(.startsWith % (str target file-path-separator)) entries)
     ))
 
 (defn url-encode [s]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6537459c/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/src/clj/backtype/storm/zookeeper.clj b/src/clj/backtype/storm/zookeeper.clj
index 76858a7..7c5d8d8 100644
--- a/src/clj/backtype/storm/zookeeper.clj
+++ b/src/clj/backtype/storm/zookeeper.clj
@@ -6,7 +6,7 @@
             ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
             Watcher$Event$EventType KeeperException$NodeExistsException])
   (:import [org.apache.zookeeper.data Stat])
-  (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory])
+  (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
   (:import [java.net InetSocketAddress BindException])
   (:import [java.io File])
   (:import [backtype.storm.utils Utils ZookeeperAuthInfo])
@@ -132,7 +132,7 @@
   (let [localfile (File. localdir)
         zk (ZooKeeperServer. localfile localfile 2000)
         [retport factory] (loop [retport (if port port 2000)]
-                            (if-let [factory-tmp (try-cause (NIOServerCnxn$Factory. (InetSocketAddress.
retport))
+                            (if-let [factory-tmp (try-cause (NIOServerCnxnFactory/createFactory
retport 60) ;; 60 is the default maxclientcnxns
                                               (catch BindException e
                                                 (when (> (inc retport) (if port port 65535))
                                                   (throw (RuntimeException. "No port is available
to launch an inprocess zookeeper.")))))]


Mime
View raw message