storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/14] move towards idiomatic Clojure style
Date Thu, 12 Jun 2014 21:10:59 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/security a762f1c5f -> 92e3a5742


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 6714fcd..6e78cca 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -13,6 +13,7 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.util
   (:import [java.net InetAddress])
   (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap])
@@ -36,10 +37,10 @@
   (:require [clojure.java.io :as io])
   (:use [clojure walk])
   (:use [backtype.storm log])
-  )
+  (:refer-clojure :exclude [some?]))
 
 (defn wrap-in-runtime
-  "Wraps an exception in a RuntimeException if needed" 
+  "Wraps an exception in a RuntimeException if needed"
   [^Exception e]
   (if (instance? RuntimeException e)
     e
@@ -59,64 +60,63 @@
   any) and similar metadata. The metadata of the alias is its initial
   metadata (as provided by def) merged into the metadata of the original."
   ([name orig]
-     `(do
-        (alter-meta!
-         (if (.hasRoot (var ~orig))
-           (def ~name (.getRawRoot (var ~orig)))
-           (def ~name))
-         ;; When copying metadata, disregard {:macro false}.
-         ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
-         #(conj (dissoc % :macro)
-                (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
-        (var ~name)))
+   `(do
+      (alter-meta!
+        (if (.hasRoot (var ~orig))
+          (def ~name (.getRawRoot (var ~orig)))
+          (def ~name))
+        ;; When copying metadata, disregard {:macro false}.
+        ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
+        #(conj (dissoc % :macro)
+               (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
+      (var ~name)))
   ([name orig doc]
-     (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
+   (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
 
 ;; name-with-attributes by Konrad Hinsen:
 (defn name-with-attributes
   "To be used in macro definitions.
-   Handles optional docstrings and attribute maps for a name to be defined
-   in a list of macro arguments. If the first macro argument is a string,
-   it is added as a docstring to name and removed from the macro argument
-   list. If afterwards the first macro argument is a map, its entries are
-   added to the name's metadata map and the map is removed from the
-   macro argument list. The return value is a vector containing the name
-   with its extended metadata map and the list of unprocessed macro
-   arguments."
+  Handles optional docstrings and attribute maps for a name to be defined
+  in a list of macro arguments. If the first macro argument is a string,
+  it is added as a docstring to name and removed from the macro argument
+  list. If afterwards the first macro argument is a map, its entries are
+  added to the name's metadata map and the map is removed from the
+  macro argument list. The return value is a vector containing the name
+  with its extended metadata map and the list of unprocessed macro
+  arguments."
   [name macro-args]
   (let [[docstring macro-args] (if (string? (first macro-args))
                                  [(first macro-args) (next macro-args)]
                                  [nil macro-args])
-    [attr macro-args]          (if (map? (first macro-args))
-                                 [(first macro-args) (next macro-args)]
-                                 [{} macro-args])
-    attr                       (if docstring
-                                 (assoc attr :doc docstring)
-                                 attr)
-    attr                       (if (meta name)
-                                 (conj (meta name) attr)
-                                 attr)]
+        [attr macro-args] (if (map? (first macro-args))
+                            [(first macro-args) (next macro-args)]
+                            [{} macro-args])
+        attr (if docstring
+               (assoc attr :doc docstring)
+               attr)
+        attr (if (meta name)
+               (conj (meta name) attr)
+               attr)]
     [(with-meta name attr) macro-args]))
 
 (defmacro defnk
- "Define a function accepting keyword arguments. Symbols up to the first
- keyword in the parameter list are taken as positional arguments.  Then
- an alternating sequence of keywords and defaults values is expected. The
- values of the keyword arguments are available in the function body by
- virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
- defnk accepts an optional docstring as well as an optional metadata map."
- [fn-name & fn-tail]
- (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
-       [pos kw-vals]           (split-with symbol? args)
-       syms                    (map #(-> % name symbol) (take-nth 2 kw-vals))
-       values                  (take-nth 2 (rest kw-vals))
-       sym-vals                (apply hash-map (interleave syms values))
-       de-map                  {:keys (vec syms)
-                                :or   sym-vals}]
-   `(defn ~fn-name
-      [~@pos & options#]
-      (let [~de-map (apply hash-map options#)]
-        ~@body))))
+  "Define a function accepting keyword arguments. Symbols up to the first
+  keyword in the parameter list are taken as positional arguments.  Then
+  an alternating sequence of keywords and defaults values is expected. The
+  values of the keyword arguments are available in the function body by
+  virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+  defnk accepts an optional docstring as well as an optional metadata map."
+  [fn-name & fn-tail]
+  (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+        [pos kw-vals] (split-with symbol? args)
+        syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+        values (take-nth 2 (rest kw-vals))
+        sym-vals (apply hash-map (interleave syms values))
+        de-map {:keys (vec syms) :or sym-vals}]
+    `(defn ~fn-name
+       [~@pos & options#]
+       (let [~de-map (apply hash-map options#)]
+         ~@body))))
 
 (defn find-first
   "Returns the first item of coll for which (pred item) returns logical true.
@@ -149,200 +149,203 @@
 
 (defn positions
   "Returns a lazy sequence containing the positions at which pred
-   is true for items in coll."
+  is true for items in coll."
   [pred coll]
   (for [[idx elt] (indexed coll) :when (pred elt)] idx))
 
-(defn exception-cause? [klass ^Throwable t]
+(defn exception-cause?
+  [klass ^Throwable t]
   (->> (iterate #(.getCause ^Throwable %) t)
        (take-while identity)
        (some (partial instance? klass))
        boolean))
 
-(defmacro thrown-cause? [klass & body]
+(defmacro thrown-cause?
+  [klass & body]
   `(try
-    ~@body
-    false
-    (catch Throwable t#
-      (exception-cause? ~klass t#))))
+     ~@body
+     false
+     (catch Throwable t#
+       (exception-cause? ~klass t#))))
 
-(defmacro thrown-cause-with-msg? [klass re & body]
+(defmacro thrown-cause-with-msg?
+  [klass re & body]
   `(try
-    ~@body
-    false
-    (catch Throwable t#
-      (and (re-matches ~re (.getMessage t#))
-        (exception-cause? ~klass t#)))))
+     ~@body
+     false
+     (catch Throwable t#
+       (and (re-matches ~re (.getMessage t#))
+            (exception-cause? ~klass t#)))))
 
-(defmacro forcat [[args aseq] & body]
+(defmacro forcat
+  [[args aseq] & body]
   `(mapcat (fn [~args]
              ~@body)
            ~aseq))
 
-(defmacro try-cause [& body]
+(defmacro try-cause
+  [& body]
   (let [checker (fn [form]
                   (or (not (sequential? form))
                       (not= 'catch (first form))))
         [code guards] (split-with checker body)
         error-local (gensym "t")
         guards (forcat [[_ klass local & guard-body] guards]
-                 `((exception-cause? ~klass ~error-local)
-                   (let [~local ~error-local]
-                     ~@guard-body
-                     )))
-        ]
+                       `((exception-cause? ~klass ~error-local)
+                         (let [~local ~error-local]
+                           ~@guard-body
+                           )))]
     `(try ~@code
-          (catch Throwable ~error-local
-            (cond ~@guards
-                  true (throw ~error-local)
-                  )))))
+       (catch Throwable ~error-local
+         (cond ~@guards
+               true (throw ~error-local)
+               )))))
 
-(defn local-hostname []
+(defn local-hostname
+  []
   (.getCanonicalHostName (InetAddress/getLocalHost)))
 
 (letfn [(try-port [port]
-          (with-open [socket (java.net.ServerSocket. port)]
-            (.getLocalPort socket)))]
+                  (with-open [socket (java.net.ServerSocket. port)]
+                    (.getLocalPort socket)))]
   (defn available-port
     ([] (try-port 0))
     ([preferred]
-      (try
-        (try-port preferred)
-        (catch java.io.IOException e
-          (available-port))))))
+     (try
+       (try-port preferred)
+       (catch java.io.IOException e
+         (available-port))))))
 
 (defn uuid []
   (str (UUID/randomUUID)))
 
-(defn current-time-secs []
+(defn current-time-secs
+  []
   (Time/currentTimeSecs))
 
-(defn current-time-millis []
+(defn current-time-millis
+  []
   (Time/currentTimeMillis))
 
-(defn secs-to-millis-long [secs]
+(defn secs-to-millis-long
+  [secs]
   (long (* (long 1000) secs)))
 
-(defn clojurify-structure [s]
+(defn clojurify-structure
+  [s]
   (prewalk (fn [x]
-              (cond (instance? Map x) (into {} x)
-                    (instance? List x) (vec x)
-                    true x))
+             (cond (instance? Map x) (into {} x)
+                   (instance? List x) (vec x)
+                   true x))
            s))
 
-(defmacro with-file-lock [path & body]
+(defmacro with-file-lock
+  [path & body]
   `(let [f# (File. ~path)
          _# (.createNewFile f#)
          rf# (RandomAccessFile. f# "rw")
          lock# (.. rf# (getChannel) (lock))]
-      (try
-        ~@body
-        (finally
-          (.release lock#)
-          (.close rf#))
-        )))
-
-(defn tokenize-path [^String path]
+     (try
+       ~@body
+       (finally
+         (.release lock#)
+         (.close rf#)))))
+
+(defn tokenize-path
+  [^String path]
   (let [toks (.split path "/")]
-    (vec (filter (complement empty?) toks))
-    ))
+    (vec (filter (complement empty?) toks))))
 
-(defn assoc-conj [m k v]
+(defn assoc-conj
+  [m k v]
   (merge-with concat m {k [v]}))
 
 ;; returns [ones in first set not in second, ones in second set not in first]
-(defn set-delta [old curr]
+(defn set-delta
+  [old curr]
   (let [s1 (set old)
         s2 (set curr)]
-    [(set/difference s1 s2) (set/difference s2 s1)]
-    ))
+    [(set/difference s1 s2) (set/difference s2 s1)]))
 
-(defn parent-path [path]
+(defn parent-path
+  [path]
   (let [toks (tokenize-path path)]
-    (str "/" (str/join "/" (butlast toks)))
-    ))
+    (str "/" (str/join "/" (butlast toks)))))
 
-(defn toks->path [toks]
-  (str "/" (str/join "/" toks))
-  )
+(defn toks->path
+  [toks]
+  (str "/" (str/join "/" toks)))
 
-(defn normalize-path [^String path]
+(defn normalize-path
+  [^String path]
   (toks->path (tokenize-path path)))
 
-(defn map-val [afn amap]
+(defn map-val
+  [afn amap]
   (into {}
-    (for [[k v] amap]
-      [k (afn v)]
-      )))
+        (for [[k v] amap]
+          [k (afn v)])))
 
-(defn filter-val [afn amap]
-  (into {}
-    (filter
-      (fn [[k v]]
-        (afn v))
-       amap
-       )))
+(defn filter-val
+  [afn amap]
+  (into {} (filter (fn [[k v]] (afn v)) amap)))
 
-(defn filter-key [afn amap]
-  (into {}
-    (filter
-      (fn [[k v]]
-        (afn k))
-       amap
-       )))
+(defn filter-key
+  [afn amap]
+  (into {} (filter (fn [[k v]] (afn k)) amap)))
 
-(defn map-key [afn amap]
-  (into {}
-    (for [[k v] amap]
-      [(afn k) v]
-      )))
+(defn map-key
+  [afn amap]
+  (into {} (for [[k v] amap] [(afn k) v])))
 
-(defn separate [pred aseq]
+(defn separate
+  [pred aseq]
   [(filter pred aseq) (filter (complement pred) aseq)])
 
-(defn full-path [parent name]
+(defn full-path
+  [parent name]
   (let [toks (tokenize-path parent)]
-    (toks->path (conj toks name))
-    ))
+    (toks->path (conj toks name))))
 
 (def not-nil? (complement nil?))
 
-(defn barr [& vals]
+(defn barr
+  [& vals]
   (byte-array (map byte vals)))
 
-(defn halt-process! [val & msg]
+(defn halt-process!
+  [val & msg]
   (log-message "Halting process: " msg)
-  (.halt (Runtime/getRuntime) val)
-  )
+  (.halt (Runtime/getRuntime) val))
 
-(defn sum [vals]
+(defn sum
+  [vals]
   (reduce + vals))
 
 (defn repeat-seq
   ([aseq]
-    (apply concat (repeat aseq)))
+   (apply concat (repeat aseq)))
   ([amt aseq]
-    (apply concat (repeat amt aseq))
-    ))
+   (apply concat (repeat amt aseq))))
 
 (defn div
   "Perform floating point division on the arguments."
-  [f & rest] (apply / (double f) rest))
+  [f & rest]
+  (apply / (double f) rest))
 
-(defn defaulted [val default]
+(defn defaulted
+  [val default]
   (if val val default))
 
 (defn mk-counter
   ([] (mk-counter 1))
   ([start-val]
-     (let [val (atom (dec start-val))]
-       (fn []
-         (swap! val inc)))))
+   (let [val (atom (dec start-val))]
+     (fn [] (swap! val inc)))))
 
 (defmacro for-times [times & body]
   `(for [i# (range ~times)]
-     ~@body
-     ))
+     ~@body))
 
 (defmacro dofor [& body]
   `(doall (for ~@body)))
@@ -351,9 +354,9 @@
   "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
   [amap]
   (reduce (fn [m [k v]]
-    (let [existing (get m v [])]
-      (assoc m v (conj existing k))))
-    {} amap))
+            (let [existing (get m v [])]
+              (assoc m v (conj existing k))))
+          {} amap))
 
 (defmacro print-vars [& vars]
   (let [prints (for [v vars] `(println ~(str v) ~v))]
@@ -366,16 +369,14 @@
         split (.split name "@")]
     (when-not (= 2 (count split))
       (throw (RuntimeException. (str "Got unexpected process name: " name))))
-    (first split)
-    ))
+    (first split)))
 
 (defn exec-command! [command]
   (let [[comm-str & args] (seq (.split command " "))
         command (CommandLine. comm-str)]
     (doseq [a args]
       (.addArgument command a))
-    (.execute (DefaultExecutor.) command)
-    ))
+    (.execute (DefaultExecutor.) command)))
 
 (defn extract-dir-from-jar [jarpath dir destdir]
   (try-cause
@@ -385,25 +386,22 @@
           (.mkdirs (.getParentFile (File. destdir (.getName file))))
           (with-open [out (FileOutputStream. (File. destdir (.getName file)))]
             (io/copy (.getInputStream jarpath file) out)))))
-  (catch IOException e
-    (log-message "Could not extract " dir " from " jarpath))
-  ))
+    (catch IOException e
+      (log-message "Could not extract " dir " from " jarpath))))
 
 (defn ensure-process-killed! [pid]
   ;; TODO: should probably do a ps ax of some sort to make sure it was killed
   (try-cause
     (exec-command! (str (if on-windows? "taskkill /f /pid " "kill -9 ") pid))
-  (catch ExecuteException e
-    (log-message "Error when trying to kill " pid ". Process is probably already dead."))
-    ))
+    (catch ExecuteException e
+      (log-message "Error when trying to kill " pid ". Process is probably already dead."))))
 
 (defnk launch-process [command :environment {}]
   (let [builder (ProcessBuilder. command)
         process-env (.environment builder)]
     (doseq [[k v] environment]
       (.put process-env k v))
-    (.start builder)
-    ))
+    (.start builder)))
 
 (defn sleep-secs [secs]
   (when (pos? secs)
@@ -427,23 +425,21 @@
                    :start true
                    :thread-name nil]
   (let [thread (Thread.
-                (fn []
-                  (try-cause
-                    (let [afn (if factory? (afn) afn)]
-                      (loop []
-                        (let [sleep-time (afn)]
-                          (when-not (nil? sleep-time)
-                            (sleep-secs sleep-time)
-                            (recur))
-                          )))
-                    (catch InterruptedException e
-                      (log-message "Async loop interrupted!")
-                      )
-                    (catch Throwable t
-                      (log-error t "Async loop died!")
-                      (kill-fn t)
-                      ))
-                  ))]
+                 (fn []
+                   (try-cause
+                     (let [afn (if factory? (afn) afn)]
+                       (loop []
+                         (let [sleep-time (afn)]
+                           (when-not (nil? sleep-time)
+                             (sleep-secs sleep-time)
+                             (recur))
+                           )))
+                     (catch InterruptedException e
+                       (log-message "Async loop interrupted!")
+                       )
+                     (catch Throwable t
+                       (log-error t "Async loop died!")
+                       (kill-fn t)))))]
     (.setDaemon thread daemon)
     (.setPriority thread priority)
     (when thread-name
@@ -452,26 +448,30 @@
       (.start thread))
     ;; should return object that supports stop, interrupt, join, and waiting?
     (reify SmartThread
-      (start [this]
+      (start
+        [this]
         (.start thread))
-      (join [this]
+      (join
+        [this]
         (.join thread))
-      (interrupt [this]
+      (interrupt
+        [this]
         (.interrupt thread))
-      (sleeping? [this]
-        (Time/isThreadWaiting thread)
-        ))
-      ))
+      (sleeping?
+        [this]
+        (Time/isThreadWaiting thread)))))
 
-(defn exists-file? [path]
+(defn exists-file?
+  [path]
   (.exists (File. path)))
 
-(defn rmr [path]
+(defn rmr
+  [path]
   (log-debug "Rmr path " path)
   (when (exists-file? path)
     (try
       (FileUtils/forceDelete (File. path))
-    (catch FileNotFoundException e))))
+      (catch FileNotFoundException e))))
 
 (defn rmpath
   "Removes file or directory at the path. Not recursive. Throws exception on failure"
@@ -480,107 +480,123 @@
   (when (exists-file? path)
     (let [deleted? (.delete (File. path))]
       (when-not deleted?
-        (throw (RuntimeException. (str "Failed to delete " path))))
-      )))
+        (throw (RuntimeException. (str "Failed to delete " path)))))))
 
 (defn local-mkdirs
   [path]
   (log-debug "Making dirs at " path)
   (FileUtils/forceMkdir (File. path)))
 
-(defn touch [path]
+(defn touch
+  [path]
   (log-debug "Touching file at " path)
   (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path))))
-                     (.createNewFile (File. path)))]
+                   (.createNewFile (File. path)))]
     (when-not success?
-      (throw (RuntimeException. (str "Failed to touch " path))))
-    ))
+      (throw (RuntimeException. (str "Failed to touch " path))))))
 
-(defn read-dir-contents [dir]
+(defn read-dir-contents
+  [dir]
   (if (exists-file? dir)
     (let [content-files (.listFiles (File. dir))]
       (map #(.getName ^File %) content-files))
-    [] ))
+    []))
 
-(defn compact [aseq]
+(defn compact
+  [aseq]
   (filter (complement nil?) aseq))
 
-(defn current-classpath []
+(defn current-classpath
+  []
   (System/getProperty "java.class.path"))
 
-(defn add-to-classpath [classpath paths]
+(defn add-to-classpath
+  [classpath paths]
   (str/join class-path-separator (cons classpath paths)))
 
-(defn ^ReentrantReadWriteLock mk-rw-lock []
+(defn ^ReentrantReadWriteLock mk-rw-lock
+  []
   (ReentrantReadWriteLock.))
 
-(defmacro read-locked [rw-lock & body]
+(defmacro read-locked
+  [rw-lock & body]
   (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
     `(let [rlock# (.readLock ~lock)]
        (try (.lock rlock#)
-            ~@body
-            (finally (.unlock rlock#))))))
+         ~@body
+         (finally (.unlock rlock#))))))
 
-(defmacro write-locked [rw-lock & body]
+(defmacro write-locked
+  [rw-lock & body]
   (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
     `(let [wlock# (.writeLock ~lock)]
        (try (.lock wlock#)
-            ~@body
-            (finally (.unlock wlock#))))))
+         ~@body
+         (finally (.unlock wlock#))))))
 
-(defn wait-for-condition [apredicate]
+(defn wait-for-condition
+  [apredicate]
   (while (not (apredicate))
-    (Time/sleep 100)
-    ))
+    (Time/sleep 100)))
 
-(defn some? [pred aseq]
+(defn some?
+  [pred aseq]
   ((complement nil?) (some pred aseq)))
 
-(defn time-delta [time-secs]
+(defn time-delta
+  [time-secs]
   (- (current-time-secs) time-secs))
 
-(defn time-delta-ms [time-ms]
+(defn time-delta-ms
+  [time-ms]
   (- (System/currentTimeMillis) (long time-ms)))
 
-(defn parse-int [str]
+(defn parse-int
+  [str]
   (Integer/valueOf str))
 
-(defn integer-divided [sum num-pieces]
+(defn integer-divided
+  [sum num-pieces]
   (clojurify-structure (Utils/integerDivided sum num-pieces)))
 
-(defn collectify [obj]
-  (if (or (sequential? obj) (instance? Collection obj)) obj [obj]))
+(defn collectify
+  [obj]
+  (if (or (sequential? obj) (instance? Collection obj))
+    obj
+    [obj]))
 
-(defn to-json [obj]
+(defn to-json
+  [obj]
   (JSONValue/toJSONString obj))
 
-(defn from-json [^String str]
+(defn from-json
+  [^String str]
   (if str
     (clojurify-structure
-     (JSONValue/parse str))
-    nil
-    ))
-
-(defmacro letlocals [& body]
-   (let [[tobind lexpr] (split-at (dec (count body)) body)
-         binded (vec (mapcat (fn [e]
-                  (if (and (list? e) (= 'bind (first e)))
-                     [(second e) (last e)]
-                     ['_ e]
-                     ))
-                  tobind ))]
-     `(let ~binded
-         ~(first lexpr)
-      )))
-
-(defn remove-first [pred aseq]
+      (JSONValue/parse str))
+    nil))
+
+(defmacro letlocals
+  [& body]
+  (let [[tobind lexpr] (split-at (dec (count body)) body)
+        binded (vec (mapcat (fn [e]
+                              (if (and (list? e) (= 'bind (first e)))
+                                [(second e) (last e)]
+                                ['_ e]
+                                ))
+                            tobind))]
+    `(let ~binded
+       ~(first lexpr))))
+
+(defn remove-first
+  [pred aseq]
   (let [[b e] (split-with (complement pred) aseq)]
     (when (empty? e)
       (throw (IllegalArgumentException. "Nothing to remove")))
-    (concat b (rest e))
-    ))
+    (concat b (rest e))))
 
-(defn assoc-non-nil [m k v]
+(defn assoc-non-nil
+  [m k v]
   (if v (assoc m k v) m))
 
 (defn multi-set
@@ -589,76 +605,71 @@
   (apply merge-with +
          (map #(hash-map % 1) aseq)))
 
-(defn set-var-root* [avar val]
+(defn set-var-root*
+  [avar val]
   (alter-var-root avar (fn [avar] val)))
 
-(defmacro set-var-root [var-sym val]
+(defmacro set-var-root
+  [var-sym val]
   `(set-var-root* (var ~var-sym) ~val))
 
-(defmacro with-var-roots [bindings & body]
+(defmacro with-var-roots
+  [bindings & body]
   (let [settings (partition 2 bindings)
         tmpvars (repeatedly (count settings) (partial gensym "old"))
         vars (map first settings)
         savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars))
         setters (for [[v s] settings] `(set-var-root ~v ~s))
-        restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)
-        ]
+        restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)]
     `(let ~savevals
-      ~@setters
-      (try
-        ~@body
-      (finally
-        ~@restorers))
-      )))
+       ~@setters
+       (try
+         ~@body
+         (finally
+           ~@restorers)))))
 
 (defn map-diff
   "Returns mappings in m2 that aren't in m1"
   [m1 m2]
-  (into {}
-    (filter
-      (fn [[k v]] (not= v (m1 k)))
-      m2
-      )))
-
-
-(defn select-keys-pred [pred amap]
-  (into {}
-        (filter
-         (fn [[k v]]
-           (pred k))
-         amap)))
+  (into {} (filter (fn [[k v]] (not= v (m1 k))) m2)))
 
+(defn select-keys-pred
+  [pred amap]
+  (into {} (filter (fn [[k v]] (pred k)) amap)))
 
-(defn rotating-random-range [choices]
+(defn rotating-random-range
+  [choices]
   (let [rand (Random.)
         choices (ArrayList. choices)]
     (Collections/shuffle choices rand)
     [(MutableInt. -1) choices rand]))
 
-(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]]
+(defn acquire-random-range-id
+  [[^MutableInt curr ^List state ^Random rand]]
   (when (>= (.increment curr) (.size state))
     (.set curr 0)
     (Collections/shuffle state rand))
   (.get state (.get curr)))
 
 ; this can be rewritten to be tail recursive
-(defn interleave-all [& colls]
+(defn interleave-all
+  [& colls]
   (if (empty? colls)
     []
     (let [colls (filter (complement empty?) colls)
           my-elems (map first colls)
           rest-elems (apply interleave-all (map rest colls))]
-      (concat my-elems rest-elems)
-      )))
+      (concat my-elems rest-elems))))
 
-(defn update [m k afn]
+(defn update
+  [m k afn]
   (assoc m k (afn (get m k))))
 
-(defn any-intersection [& sets]
+(defn any-intersection
+  [& sets]
   (let [elem->count (multi-set (apply concat sets))]
     (-> (filter-val #(> % 1) elem->count)
-        keys
-        )))
+        keys)))
 
 (defn between?
   "val >= lower and val <= upper"
@@ -666,19 +677,20 @@
   (and (>= val lower)
        (<= val upper)))
 
-(defmacro benchmark [& body]
+(defmacro benchmark
+  [& body]
   `(let [l# (doall (range 1000000))]
      (time
        (doseq [i# l#]
          ~@body))))
 
-(defn rand-sampler [freq]
+(defn rand-sampler
+  [freq]
   (let [r (java.util.Random.)]
-    (fn []
-      (= 0 (.nextInt r freq)))
-    ))
+    (fn [] (= 0 (.nextInt r freq)))))
 
-(defn even-sampler [freq]
+(defn even-sampler
+  [freq]
   (let [freq (int freq)
         start (int 0)
         r (java.util.Random.)
@@ -690,38 +702,42 @@
           (when (>= i freq)
             (.set curr start)
             (.set target (.nextInt r freq))))
-          (= (.get curr) (.get target)))
+        (= (.get curr) (.get target)))
       {:rate freq})))
 
-(defn sampler-rate [sampler]
+(defn sampler-rate
+  [sampler]
   (:rate (meta sampler)))
 
-(defn class-selector [obj & args] (class obj))
+(defn class-selector
+  [obj & args]
+  (class obj))
 
 (defn uptime-computer []
   (let [start-time (current-time-secs)]
-    (fn []
-      (time-delta start-time)
-      )))
+    (fn [] (time-delta start-time))))
 
 (defn stringify-error [error]
   (let [result (StringWriter.)
         printer (PrintWriter. result)]
     (.printStackTrace error printer)
-    (.toString result)
-    ))
+    (.toString result)))
 
-(defn nil-to-zero [v]
+(defn nil-to-zero
+  [v]
   (or v 0))
 
-(defn bit-xor-vals [vals]
+(defn bit-xor-vals
+  [vals]
   (reduce bit-xor 0 vals))
 
-(defmacro with-error-reaction [afn & body]
+(defmacro with-error-reaction
+  [afn & body]
   `(try ~@body
      (catch Throwable t# (~afn t#))))
 
-(defn container []
+(defn container
+  []
   (Container.))
 
 (defn container-set! [^Container container obj]
@@ -737,7 +753,8 @@
 (defn throw-runtime [& strs]
   (throw (RuntimeException. (apply str strs))))
 
-(defn redirect-stdio-to-slf4j! []
+(defn redirect-stdio-to-slf4j!
+  []
   ;; set-var-root doesn't work with *out* and *err*, so digging much deeper here
   ;; Unfortunately, this code seems to work at the REPL but not when spawned as worker processes
   ;; it might have something to do with being a child process
@@ -751,29 +768,32 @@
   ;;         true))
   (log-capture! "STDIO"))
 
-(defn spy [prefix val]
+(defn spy
+  [prefix val]
   (log-message prefix ": " val)
   val)
 
-(defn zip-contains-dir? [zipfile target]
+(defn zip-contains-dir?
+  [zipfile target]
   (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))]
-    (some? #(.startsWith % (str target "/")) entries)
-    ))
+    (some? #(.startsWith % (str target "/")) entries)))
 
-(defn url-encode [s]
+(defn url-encode
+  [s]
   (java.net.URLEncoder/encode s "UTF-8"))
 
-(defn url-decode [s]
+(defn url-decode
+  [s]
   (java.net.URLDecoder/decode s "UTF-8"))
 
-(defn join-maps [& maps]
+(defn join-maps
+  [& maps]
   (let [all-keys (apply set/union (for [m maps] (-> m keys set)))]
-    (into {}
-      (for [k all-keys]
-        [k (for [m maps] (m k))]
-        ))))
+    (into {} (for [k all-keys]
+               [k (for [m maps] (m k))]))))
 
-(defn partition-fixed [max-num-chunks aseq]
+(defn partition-fixed
+  [max-num-chunks aseq]
   (if (zero? max-num-chunks)
     []
     (let [chunks (->> (integer-divided (count aseq) max-num-chunks)
@@ -792,99 +812,115 @@
                    rest-data)))))))
 
 
-(defn assoc-apply-self [curr key afn]
+(defn assoc-apply-self
+  [curr key afn]
   (assoc curr key (afn curr)))
 
-(defmacro recursive-map [& forms]
+(defmacro recursive-map
+  [& forms]
   (->> (partition 2 forms)
        (map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form))))
        (concat `(-> {}))))
 
-(defn current-stack-trace []
+(defn current-stack-trace
+  []
   (->> (Thread/currentThread)
        .getStackTrace
        (map str)
-       (str/join "\n")
-       ))
+       (str/join "\n")))
 
-(defn get-iterator [^Iterable alist]
+(defn get-iterator
+  [^Iterable alist]
   (if alist (.iterator alist)))
 
-(defn iter-has-next? [^Iterator iter]
+(defn iter-has-next?
+  [^Iterator iter]
   (if iter (.hasNext iter) false))
 
-(defn iter-next [^Iterator iter]
+(defn iter-next
+  [^Iterator iter]
   (.next iter))
 
-(defmacro fast-list-iter [pairs & body]
+(defmacro fast-list-iter
+  [pairs & body]
   (let [pairs (partition 2 pairs)
         lists (map second pairs)
         elems (map first pairs)
         iters (map (fn [_] (gensym)) lists)
-        bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists) (apply concat))
+        bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists)
+                      (apply concat))
         tests (map (fn [i] `(iter-has-next? ~i)) iters)
-        assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters) (apply concat))]
+        assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters)
+                         (apply concat))]
     `(let [~@bindings]
        (while (and ~@tests)
          (let [~@assignments]
-           ~@body
-           )))))
+           ~@body)))))
 
-(defn fast-list-map [afn alist]
+(defn fast-list-map
+  [afn alist]
   (let [ret (ArrayList.)]
     (fast-list-iter [e alist]
-      (.add ret (afn e)))
-    ret ))
+                    (.add ret (afn e)))
+    ret))
 
-(defmacro fast-list-for [[e alist] & body]
+(defmacro fast-list-for
+  [[e alist] & body]
   `(fast-list-map (fn [~e] ~@body) ~alist))
 
-(defn map-iter [^Map amap]
+(defn map-iter
+  [^Map amap]
   (if amap (-> amap .entrySet .iterator)))
 
-(defn convert-entry [^Map$Entry entry]
+(defn convert-entry
+  [^Map$Entry entry]
   [(.getKey entry) (.getValue entry)])
 
-(defmacro fast-map-iter [[bind amap] & body]
+(defmacro fast-map-iter
+  [[bind amap] & body]
   `(let [iter# (map-iter ~amap)]
-    (while (iter-has-next? iter#)
-      (let [entry# (iter-next iter#)
-            ~bind (convert-entry entry#)]
-        ~@body
-        ))))
+     (while (iter-has-next? iter#)
+       (let [entry# (iter-next iter#)
+             ~bind (convert-entry entry#)]
+         ~@body))))
 
-(defn fast-first [^List alist]
+(defn fast-first
+  [^List alist]
   (.get alist 0))
 
-(defmacro get-with-default [amap key default-val]
+(defmacro get-with-default
+  [amap key default-val]
   `(let [curr# (.get ~amap ~key)]
      (if curr#
        curr#
        (do
          (let [new# ~default-val]
            (.put ~amap ~key new#)
-           new#
-           )))))
+           new#)))))
 
-(defn fast-group-by [afn alist]
+(defn fast-group-by
+  [afn alist]
   (let [ret (HashMap.)]
-    (fast-list-iter [e alist]
+    (fast-list-iter
+      [e alist]
       (let [key (afn e)
             ^List curr (get-with-default ret key (ArrayList.))]
         (.add curr e)))
-    ret ))
+    ret))
 
-(defn new-instance [klass]
+(defn new-instance
+  [klass]
   (let [klass (if (string? klass) (Class/forName klass) klass)]
-    (.newInstance klass)
-    ))
+    (.newInstance klass)))
 
 (defmacro -<>
   ([x] x)
-  ([x form] (if (seq? form)
-              (with-meta
-                (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
-                  (concat begin [x] end))
-                (meta form))
-              (list form x)))
-  ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
+  ([x form]
+   (if (seq? form)
+     (with-meta
+       (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+         (concat begin [x] end))
+       (meta form))
+     (list form x)))
+  ([x form & more]
+   `(-<> (-<> ~x ~form) ~@more)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2278fc96/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj
index ab3e0b2..46d1c69 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -13,6 +13,7 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+
 (ns backtype.storm.zookeeper
   (:import [org.apache.curator.retry RetryNTimes])
   (:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener
UnhandledErrorListener])
@@ -31,47 +32,50 @@
   {Watcher$Event$KeeperState/Disconnected :disconnected
    Watcher$Event$KeeperState/SyncConnected :connected
    Watcher$Event$KeeperState/AuthFailed :auth-failed
-   Watcher$Event$KeeperState/Expired :expired
-  })
+   Watcher$Event$KeeperState/Expired :expired})
 
 (def zk-event-types
   {Watcher$Event$EventType/None :none
    Watcher$Event$EventType/NodeCreated :node-created
    Watcher$Event$EventType/NodeDeleted :node-deleted
    Watcher$Event$EventType/NodeDataChanged :node-data-changed
-   Watcher$Event$EventType/NodeChildrenChanged :node-children-changed
-  })
+   Watcher$Event$EventType/NodeChildrenChanged :node-children-changed})
 
-(defn- default-watcher [state type path]
+(defn- default-watcher
+  [state type path]
   (log-message "Zookeeper state update: " state type path))
 
-(defnk mk-client [conf servers port :root "" :watcher default-watcher :auth-conf nil]
+(defnk mk-client
+  [conf servers port
+   :root ""
+   :watcher default-watcher
+   :auth-conf nil]
   (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
     (.. fk
         (getCuratorListenable)
         (addListener
-         (reify CuratorListener
-           (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
-             (when (= (.getType e) CuratorEventType/WATCHED)                  
-               (let [^WatchedEvent event (.getWatchedEvent e)]
-                 (watcher (zk-keeper-states (.getState event))
-                          (zk-event-types (.getType event))
-                          (.getPath event))))))))
-;;    (.. fk
-;;        (getUnhandledErrorListenable)
-;;        (addListener
-;;         (reify UnhandledErrorListener
-;;           (unhandledError [this msg error]
-;;             (if (or (exception-cause? InterruptedException error)
-;;                     (exception-cause? java.nio.channels.ClosedByInterruptException error))
-;;               (do (log-warn-error error "Zookeeper exception " msg)
-;;                   (let [to-throw (InterruptedException.)]
-;;                     (.initCause to-throw error)
-;;                     (throw to-throw)
-;;                     ))
-;;               (do (log-error error "Unrecoverable Zookeeper error " msg)
-;;                   (halt-process! 1 "Unrecoverable Zookeeper error")))
-;;             ))))
+          (reify CuratorListener
+            (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
+                   (when (= (.getType e) CuratorEventType/WATCHED)
+                     (let [^WatchedEvent event (.getWatchedEvent e)]
+                       (watcher (zk-keeper-states (.getState event))
+                                (zk-event-types (.getType event))
+                                (.getPath event))))))))
+    ;;    (.. fk
+    ;;        (getUnhandledErrorListenable)
+    ;;        (addListener
+    ;;         (reify UnhandledErrorListener
+    ;;           (unhandledError [this msg error]
+    ;;             (if (or (exception-cause? InterruptedException error)
+    ;;                     (exception-cause? java.nio.channels.ClosedByInterruptException
error))
+    ;;               (do (log-warn-error error "Zookeeper exception " msg)
+    ;;                   (let [to-throw (InterruptedException.)]
+    ;;                     (.initCause to-throw error)
+    ;;                     (throw to-throw)
+    ;;                     ))
+    ;;               (do (log-error error "Unrecoverable Zookeeper error " msg)
+    ;;                   (halt-process! 1 "Unrecoverable Zookeeper error")))
+    ;;             ))))
     (.start fk)
     fk))
 
@@ -82,27 +86,30 @@
 
 (defn create-node
   ([^CuratorFramework zk ^String path ^bytes data mode]
-    (try
-      (.. zk (create) (withMode (zk-create-modes mode)) (withACL ZooDefs$Ids/OPEN_ACL_UNSAFE)
(forPath (normalize-path path) data))
-      (catch Exception e (throw (wrap-in-runtime e)))))
+   (try
+     (.. zk (create) (withMode (zk-create-modes mode)) (withACL ZooDefs$Ids/OPEN_ACL_UNSAFE)
(forPath (normalize-path path) data))
+     (catch Exception e (throw (wrap-in-runtime e)))))
   ([^CuratorFramework zk ^String path ^bytes data]
-    (create-node zk path data :persistent)))
+   (create-node zk path data :persistent)))
 
-(defn exists-node? [^CuratorFramework zk ^String path watch?]
+(defn exists-node?
+  [^CuratorFramework zk ^String path watch?]
   ((complement nil?)
-    (try
-      (if watch?
-         (.. zk (checkExists) (watched) (forPath (normalize-path path))) 
-         (.. zk (checkExists) (forPath (normalize-path path))))
-      (catch Exception e (throw (wrap-in-runtime e))))))
-
-(defnk delete-node [^CuratorFramework zk ^String path :force false]
+   (try
+     (if watch?
+       (.. zk (checkExists) (watched) (forPath (normalize-path path)))
+       (.. zk (checkExists) (forPath (normalize-path path))))
+     (catch Exception e (throw (wrap-in-runtime e))))))
+
+(defnk delete-node
+  [^CuratorFramework zk ^String path :force false]
   (try-cause  (.. zk (delete) (forPath (normalize-path path)))
-    (catch KeeperException$NoNodeException e
-      (when-not force (throw e)))
-    (catch Exception e (throw (wrap-in-runtime e)))))
+             (catch KeeperException$NoNodeException e
+               (when-not force (throw e)))
+             (catch Exception e (throw (wrap-in-runtime e)))))
 
-(defn mkdirs [^CuratorFramework zk ^String path]
+(defn mkdirs
+  [^CuratorFramework zk ^String path]
   (let [path (normalize-path path)]
     (when-not (or (= path "/") (exists-node? zk path false))
       (mkdirs zk (parent-path path))
@@ -113,59 +120,68 @@
           ))
       )))
 
-(defn get-data [^CuratorFramework zk ^String path watch?]
+(defn get-data
+  [^CuratorFramework zk ^String path watch?]
   (let [path (normalize-path path)]
     (try-cause
       (if (exists-node? zk path watch?)
         (if watch?
           (.. zk (getData) (watched) (forPath path))
           (.. zk (getData) (forPath path))))
-    (catch KeeperException$NoNodeException e
-      ;; this is fine b/c we still have a watch from the successful exists call
-      nil )
-    (catch Exception e (throw (wrap-in-runtime e))))))
+      (catch KeeperException$NoNodeException e
+        ;; this is fine b/c we still have a watch from the successful exists call
+        nil )
+      (catch Exception e (throw (wrap-in-runtime e))))))
 
-(defn get-children [^CuratorFramework zk ^String path watch?]
+(defn get-children
+  [^CuratorFramework zk ^String path watch?]
   (try
     (if watch?
       (.. zk (getChildren) (watched) (forPath (normalize-path path)))
       (.. zk (getChildren) (forPath (normalize-path path))))
     (catch Exception e (throw (wrap-in-runtime e)))))
 
-(defn set-data [^CuratorFramework zk ^String path ^bytes data]
+(defn set-data
+  [^CuratorFramework zk ^String path ^bytes data]
   (try
     (.. zk (setData) (forPath (normalize-path path) data))
     (catch Exception e (throw (wrap-in-runtime e)))))
 
-(defn exists [^CuratorFramework zk ^String path watch?]
+(defn exists
+  [^CuratorFramework zk ^String path watch?]
   (exists-node? zk path watch?))
 
-(defn delete-recursive [^CuratorFramework zk ^String path]
+(defn delete-recursive
+  [^CuratorFramework zk ^String path]
   (let [path (normalize-path path)]
     (when (exists-node? zk path false)
-      (let [children (try-cause (get-children zk path false)
-                                (catch KeeperException$NoNodeException e
-                                  []
-                                  ))]
+      (let [children (try-cause
+                       (get-children zk path false)
+                       (catch KeeperException$NoNodeException e []))]
         (doseq [c children]
           (delete-recursive zk (full-path path c)))
-        (delete-node zk path :force true)
-        ))))
+        (delete-node zk path :force true)))))
 
-(defnk mk-inprocess-zookeeper [localdir :port nil]
+(defnk mk-inprocess-zookeeper
+  [localdir :port nil]
   (let [localfile (File. localdir)
         zk (ZooKeeperServer. localfile localfile 2000)
-        [retport factory] (loop [retport (if port port 2000)]
-                            (if-let [factory-tmp (try-cause (doto (NIOServerCnxnFactory.)
(.configure (InetSocketAddress. retport) 0))
-                                              (catch BindException e
-                                                (when (> (inc retport) (if port port 65535))
-                                                  (throw (RuntimeException. "No port is available
to launch an inprocess zookeeper.")))))]
-                              [retport factory-tmp]
-                              (recur (inc retport))))]
-    (log-message "Starting inprocess zookeeper at port " retport " and dir " localdir)  
 
+        [retport factory]
+        (loop [retport (if port port 2000)]
+          (if-let [factory-tmp
+                   (try-cause
+                     (doto (NIOServerCnxnFactory.)
+                       (.configure (InetSocketAddress. retport) 0))
+                     (catch BindException e
+                       (when (> (inc retport) (if port port 65535))
+                         (throw (RuntimeException.
+                                  "No port is available to launch an inprocess zookeeper.")))))]
+            [retport factory-tmp]
+            (recur (inc retport))))]
+    (log-message "Starting inprocess zookeeper at port " retport " and dir " localdir)
     (.startup factory zk)
-    [retport factory]
-    ))
+    [retport factory]))
 
-(defn shutdown-inprocess-zookeeper [handle]
+(defn shutdown-inprocess-zookeeper
+  [handle]
   (.shutdown handle))


Mime
View raw message