storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [47/50] [abbrv] storm git commit: Addressing TODOs. Calling sync before calling getChildren on code-distributor path as zookeeper does not gurantee Simultaneously Consistent Cross-Client Views unless sync is called.
Date Mon, 24 Aug 2015 13:52:32 GMT
Addressing TODOs. Calling sync before calling getChildren on code-distributor path as zookeeper
does not gurantee Simultaneously Consistent Cross-Client Views unless sync is called.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93dbcafa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93dbcafa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93dbcafa

Branch: refs/heads/master
Commit: 93dbcafaa2e5e9fd4cdc9445d1104fb61ae286f2
Parents: ef3cee6
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Authored: Wed Aug 12 13:53:39 2015 -0700
Committer: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Committed: Wed Aug 12 13:53:39 2015 -0700

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj      | 17 ++++++++++++++---
 .../src/clj/backtype/storm/daemon/nimbus.clj       |  7 +++----
 2 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/93dbcafa/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 04f5b89..3884096 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -44,7 +44,8 @@
   (close [this])
   (register [this callback])
   (unregister [this id])
-  (add-listener [this listener]))
+  (add-listener [this listener])
+  (sync-path [this path]))
 
 (defn mk-topo-only-acls
   [topo-conf]
@@ -145,6 +146,10 @@
       (add-listener
         [this listener]
         (zk/add-listener zk listener))
+
+      (sync-path
+        [this path]
+        (zk/sync-path zk path))
       )))
 
 (defprotocol StormClusterState
@@ -365,7 +370,9 @@
         [this callback]
         (when callback
           (reset! code-distributor-callback callback))
-        (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback)))
+        (do
+          (sync-path cluster-state CODE-DISTRIBUTOR-SUBTREE)
+          (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback))))
 
       (nimbuses
         [this]
@@ -389,7 +396,11 @@
 
       (code-distributor-info
         [this storm-id]
-        (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children cluster-state
(code-distributor-path storm-id) false)))
+        (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info))
+          (let [path (code-distributor-path storm-id)]
+            (do
+              (sync-path cluster-state path)
+              (get-children cluster-state path false)))))
 
       (active-storms
         [this]

http://git-wip-us.apache.org/repos/asf/storm/blob/93dbcafa/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index fa3d8ec..3ee4cdc 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1492,8 +1492,6 @@
         (.removeFromLeaderLockQueue (:leader-elector nimbus))
         (doseq [missing missing-topologies]
           (log-message "missing topology " missing " has state on zookeeper but doesn't have
a local dir on this host.")
-          ;; complete heck to get around zookeeper eventual consistency issue. zk/sync is
not helping us so adding a sleep.
-          (sleep-secs 5)
           (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)]
             (log-message "trying to download missing topology code from " (clojure.string/join
"," nimbuses-with-missing))
             (doseq [nimbus-host-port nimbuses-with-missing]
@@ -1501,8 +1499,9 @@
                 (try
                   (download-code conf nimbus missing (.getHost nimbus-host-port) (.getPort
nimbus-host-port))
                   (catch Exception e (log-error e "Exception while trying to syn-code for
missing topology" missing)))))))))
-    ;;TODO Ideally This should only be called if all missing topology code was successfully
downloaded.
-    (.addToLeaderLockQueue (:leader-elector nimbus))))
+
+    (if (empty? (set/difference active-topologies (set (code-ids (:conf nimbus)))))
+      (.addToLeaderLockQueue (:leader-elector nimbus)))))
 
 (defmethod sync-code :local [conf nimbus]
   nil)


Mime
View raw message