storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/9] git commit: Demo conservative ZK assignments (lots of logging)
Date Fri, 11 Jul 2014 21:18:15 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/master 9e77cd237 -> 9e8c769a6


Demo conservative ZK assignments (lots of logging)


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b27b8c4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b27b8c4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b27b8c4f

Branch: refs/heads/master
Commit: b27b8c4fa015f19db6a087c3a565a703b39dc517
Parents: 8f2e749
Author: Kyle Nusbaum <knusbaum@yahoo-inc.com>
Authored: Thu Jun 26 21:13:56 2014 +0000
Committer: Kyle Nusbaum <knusbaum@yahoo-inc.com>
Committed: Thu Jun 26 21:13:56 2014 +0000

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj   | 31 ++++++++++++-
 .../src/clj/backtype/storm/daemon/common.clj    |  1 +
 .../clj/backtype/storm/daemon/supervisor.clj    | 46 ++++++++++++++++----
 storm-core/src/clj/backtype/storm/zookeeper.clj | 25 +++++++++++
 4 files changed, 93 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b27b8c4f/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index d3682ac..c5e6f16 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -29,6 +29,8 @@
   ;; if node does not exist, create persistent with this data
   (set-data [this path data])
   (get-data [this path watch?])
+  (get-version [this path watch?])
+  (get-data-with-version [this path watch?])
   (get-children [this path watch?])
   (mkdirs [this path])
   (close [this])
@@ -56,7 +58,6 @@
                                           (callback type path))))))]
     (reify
      ClusterState
-
      (register
        [this callback]
        (let [id (uuid)]
@@ -100,6 +101,14 @@
        [this path watch?]
        (zk/get-data zk path watch?))
 
+     (get-data-with-version
+       [this path watch?]
+       (zk/get-data-with-version zk path watch?))
+
+     (get-version 
+       [this path watch?]
+       (zk/get-version zk path watch?))
+
      (get-children
        [this path watch?]
        (zk/get-children zk path watch?))
@@ -113,9 +122,12 @@
        (reset! active false)
        (.close zk)))))
 
+
 (defprotocol StormClusterState
   (assignments [this callback])
   (assignment-info [this storm-id callback])
+  (assignment-info-with-version [this storm-id callback])
+  (assignment-version [this storm-id callback])
   (active-storms [this])
   (storm-base [this storm-id callback])
   (get-worker-heartbeat [this storm-id node port])
@@ -225,6 +237,8 @@
                                 [false cluster-state-spec]
                                 [true (mk-distributed-cluster-state cluster-state-spec)])
         assignment-info-callback (atom {})
+        assignment-info-with-version-callback (atom {})
+        assignment-version-callback (atom {})
         supervisors-callback (atom nil)
         assignments-callback (atom nil)
         storm-base-callback (atom {})
@@ -257,6 +271,21 @@
           (swap! assignment-info-callback assoc storm-id callback))
         (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
 
+      (assignment-info-with-version 
+        [this storm-id callback]
+        (when callback
+          (swap! assignment-info-with-version-callback assoc storm-id callback))
+        (let [{data :data version :version} 
+              (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
+        {:data (maybe-deserialize data)
+         :version version}))
+
+      (assignment-version 
+        [this storm-id callback]
+        (when callback
+          (swap! assignment-version-callback assoc storm-id callback))
+        (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
+
       (active-storms
         [this]
         (get-children cluster-state STORMS-SUBTREE false))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b27b8c4f/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index 43746b3..f2ad7ce 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -61,6 +61,7 @@
 (def LS-ID "supervisor-id")
 (def LS-LOCAL-ASSIGNMENTS "local-assignments")
 (def LS-APPROVED-WORKERS "approved-workers")
+(def LS-ASSIGNMENT-VERSIONS "local-assignment-versions")
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b27b8c4f/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 414d894..3b2a6b5 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -35,13 +35,33 @@
   (shutdown-all-workers [this])
   )
 
-(defn- assignments-snapshot [storm-cluster-state callback]
+(defn- assignments-snapshot [storm-cluster-state callback existing-assignment assignment-versions]
+  (log-message (str "Recalculating assignments with old: " assignment-versions))
   (let [storm-ids (.assignments storm-cluster-state callback)]
-     (->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid callback)})
-          (apply merge)
-          (filter-val not-nil?)
-          )))
-
+    (let [new-assignments 
+          (->>
+           (dofor [sid storm-ids] 
+                  (let [recorded-version (:version (get assignment-versions sid))]
+                    (if-let [assignment-version (.assignment-version storm-cluster-state
sid callback)]
+                      (do
+                        (log-message (str "Version: " assignment-version " || Recorded Version:
" recorded-version))
+                        (if (= assignment-version recorded-version)
+                          (do
+                            (log-message "Using Existing assignment.")
+                            {sid (get assignment-versions sid)})
+                          (do
+                            (log-message "Getting new Assignments.")
+                            (let [assignments (.assignment-info-with-version storm-cluster-state
sid callback)] 
+                              (log-message (str "Assignments: " assignments))
+                              {sid assignments}))))
+                      {sid nil})))
+           (apply merge)
+           (filter-val not-nil?))]
+
+      {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
+       :versions new-assignments})))
+            
+  
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
   (let [assignment (get assignments-snapshot storm-id)
         my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
@@ -297,7 +317,13 @@
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
           sync-callback (fn [& ignored] (.add event-manager this))
-          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
+          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)
+          assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS)
+          {assignments-snapshot :assignments versions :versions}  (assignments-snapshot 
+                                                                   storm-cluster-state sync-callback

+                                                                   existing-assignment assignment-versions)
+          _ (log-message (str "Got Assignments: " assignments-snapshot
+                              " || And Versions: " (pr-str versions)))
           storm-code-map (read-storm-code-locations assignments-snapshot)
           downloaded-storm-ids (set (read-downloaded-storm-ids conf))
           all-assignment (read-assignments
@@ -305,8 +331,7 @@
                            (:assignment-id supervisor))
           new-assignment (->> all-assignment
                               (filter-key #(.confirmAssigned isupervisor %)))
-          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
-          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
+          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
       (log-debug "Synchronizing supervisor")
       (log-debug "Storm code map: " storm-code-map)
       (log-debug "Downloaded storm ids: " downloaded-storm-ids)
@@ -340,6 +365,9 @@
       (.put local-state
             LS-LOCAL-ASSIGNMENTS
             new-assignment)
+      (.put local-state
+            LS-ASSIGNMENT-VERSIONS
+            versions)
       (reset! (:curr-assignment supervisor) new-assignment)
       ;; remove any downloaded code that's no longer assigned or active
       ;; important that this happens after setting the local assignment so that

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b27b8c4f/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj
index 46d1c69..8a97be8 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -133,6 +133,31 @@
         nil )
       (catch Exception e (throw (wrap-in-runtime e))))))
 
+(defn get-data-with-version 
+  [^CuratorFramework zk ^String path watch?]
+  (let [stats (org.apache.zookeeper.data.Stat. )
+        path (normalize-path path)]
+    (try-cause
+     (if-let [data 
+              (if (exists-node? zk path watch?)
+                (if watch?
+                  (.. zk (getData) (watched) (storingStatIn stats) (forPath path))
+                  (.. zk (getData) (storingStatIn stats) (forPath path))))]
+       {:data data
+        :version (.getVersion stats)})
+     (catch KeeperException$NoNodeException e
+       ;; this is fine b/c we still have a watch from the successful exists call
+       nil ))))
+
+(defn get-version 
+[^CuratorFramework zk ^String path watch?]
+  (if-let [stats
+           (if watch?
+             (.. zk (checkExists) (watched) (forPath (normalize-path path)))
+             (.. zk (checkExists) (forPath (normalize-path path))))]
+    (.getVersion stats)
+    nil))
+
 (defn get-children
   [^CuratorFramework zk ^String path watch?]
   (try


Mime
View raw message