storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] storm git commit: STORM-2740: Add in caching of topology and conf to nimbus
Date Mon, 18 Sep 2017 14:39:34 GMT
Repository: storm
Updated Branches:
  refs/heads/master 05a74c73d -> da2f03586


STORM-2740: Add in caching of topology and conf to nimbus


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac8d37b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac8d37b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac8d37b9

Branch: refs/heads/master
Commit: ac8d37b9fdb43197b143901bd6d3212601265bca
Parents: 124acb9
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Thu Sep 14 17:02:24 2017 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Fri Sep 15 11:15:06 2017 -0500

----------------------------------------------------------------------
 .../org/apache/storm/blobstore/BlobStore.java   |  65 +++--
 .../src/jvm/org/apache/storm/utils/Utils.java   |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  72 +++---
 .../apache/storm/security/auth/auth_test.clj    |   2 +-
 .../storm/security/auth/nimbus_auth_test.clj    |  14 +-
 .../java/org/apache/storm/LocalCluster.java     |  18 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 130 +++++-----
 .../apache/storm/daemon/nimbus/TopoCache.java   | 244 +++++++++++++++++++
 .../storm/zookeeper/LeaderElectorImp.java       |  10 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |  27 +-
 11 files changed, 446 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index bb177a1..406ac8b 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -27,23 +27,20 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.regex.Pattern;
-
 import javax.security.auth.Subject;
-
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
 import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Provides a way to store blobs that can be downloaded.
@@ -232,6 +229,30 @@ public abstract class BlobStore implements Shutdownable {
      * Wrapper called to create the blob which contains
      * the byte data
      * @param key Key for the blob.
+     * @param data Byte data that needs to be uploaded.
+     * @param who Is the subject creating the blob.
+     * @throws AuthorizationException
+     * @throws IOException
+     * @throws KeyNotFoundException
+     */
+    public void updateBlob(String key, byte [] data, Subject who) throws AuthorizationException, IOException, KeyNotFoundException {
+        AtomicOutputStream out = null;
+        try {
+            out = updateBlob(key, who);
+            out.write(data);
+            out.close();
+            out = null;
+        } finally {
+            if (out != null) {
+                out.cancel();
+            }
+        }
+    }
+
+    /**
+     * Wrapper called to create the blob which contains
+     * the byte data
+     * @param key Key for the blob.
      * @param in InputStream from which the data is read to be
      * written as a part of the blob.
      * @param meta Metadata which contains the acls information
@@ -305,32 +326,6 @@ public abstract class BlobStore implements Shutdownable {
         out.close();
         return bytes;
     }
-
-    /**
-     * Helper method to read a stored topology
-     * @param topoId the id of the topology to read
-     * @param who who to read it as
-     * @return the deserialized topology.
-     * @throws IOException on any error while reading the blob.
-     * @throws AuthorizationException if who is not allowed to read the blob
-     * @throws KeyNotFoundException if the blob could not be found
-     */
-    public StormTopology readTopology(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
-        return Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), StormTopology.class);
-    }
-    
-    /**
-     * Helper method to read a stored topology config
-     * @param topoId the id of the topology whose conf we are reading
-     * @param who who we are reading this as
-     * @return the deserialized config
-     * @throws KeyNotFoundException if the blob could not be found
-     * @throws AuthorizationException if who is not allowed to read the blob
-     * @throws IOException on any error while reading the blob.
-     */
-    public Map<String, Object> readTopologyConf(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
-        return Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), who));
-    }
     
     private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index a8820e6..a028935 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1104,21 +1104,11 @@ public class Utils {
     public static long getVersionFromBlobVersionFile(File versionFile) {
         long currentVersion = 0;
         if (versionFile.exists() && !(versionFile.isDirectory())) {
-            BufferedReader br = null;
-            try {
-                br = new BufferedReader(new FileReader(versionFile));
+            try (BufferedReader br = new BufferedReader(new FileReader(versionFile))) {
                 String line = br.readLine();
                 currentVersion = Long.parseLong(line);
             } catch (IOException e) {
                 throw new RuntimeException(e);
-            } finally {
-                try {
-                    if (br != null) {
-                        br.close();
-                    }
-                } catch (Exception ignore) {
-                    LOG.error("Exception trying to cleanup", ignore);
-                }
             }
             return currentVersion;
         } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 4b4bc37..8aee299 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -25,8 +25,8 @@
 
 (defn -main [^String tmpjarpath & args]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
-        zk-leader-elector (Zookeeper/zkLeaderElector conf nil)
+        ; since this is not a purpose to add to leader lock queue, passing nil as blob-store and topo cache is ok
+        zk-leader-elector (Zookeeper/zkLeaderElector conf nil nil)
         leader-nimbus (.getLeader zk-leader-elector)
         host (.getHost leader-nimbus)
         port (.getPort leader-nimbus)

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index c4f3fad..dcea44e 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -21,7 +21,7 @@
             TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
            [org.apache.storm.blobstore BlobStore]
            [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
-           [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
+           [org.apache.storm.daemon.nimbus TopoCache Nimbus Nimbus$StandaloneINimbus]
            [org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
            [org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
            [org.apache.storm.stats BoltExecutorStats StatsUtil]
@@ -56,7 +56,7 @@
 
 (defn- mk-nimbus
   [conf inimbus blob-store leader-elector group-mapper cluster-state]
-  (Nimbus. conf inimbus cluster-state nil blob-store leader-elector group-mapper))
+  (Nimbus. conf inimbus cluster-state nil blob-store nil leader-elector group-mapper))
 
 (defn- from-json
        [^String str]
@@ -1307,7 +1307,7 @@
   (with-open [zk (InProcessZookeeper. )]
     (with-open [tmp-nimbus-dir (TmpPath.)
                 _ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+                      (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
       (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1324,7 +1324,7 @@
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. false))))]
+                          (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. false))))]
 
             (letlocals
               (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1384,11 +1384,13 @@
 
 (deftest test-nimbus-iface-methods-check-authorization
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
       (let [nimbus (.getNimbus cluster)
             topology-name "test"
@@ -1403,11 +1405,13 @@
 
 (deftest test-nimbus-check-authorization-params
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
                             (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
     (let [nimbus (.getNimbus cluster)
@@ -1418,8 +1422,8 @@
           expected-conf {TOPOLOGY-NAME expected-name
                          "foo" "bar"}]
       (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
-      (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf)
-      (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil)
+      (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) expected-conf)
+      (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/anyObject))) nil)
       (testing "getTopologyConf calls check-authorization! with the correct parameters."
       (let [expected-operation "getTopologyConf"]
           (try
@@ -1456,15 +1460,17 @@
             (finally
               (.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
               ;;One for this time and one for getTopology call
-              (.readTopology (Mockito/verify blob-store (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
+              (.readTopology (Mockito/verify tc (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
 
 (deftest test-check-authorization-getSupervisorPageInfo
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
                             (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
     (let [nimbus (.getNimbus cluster)
@@ -1494,8 +1500,8 @@
                             (.put "super2" (doto (SupervisorInfo.) (.set_hostname "host2") (.set_meta [(long 1234)])
                                              (.set_uptime_secs (long 123)) (.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {}))))]
       (.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) all-supervisors)
-      (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) expected-conf)
-      (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
+      (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) expected-conf)
+      (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
       (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
       (.getSupervisorPageInfo nimbus "super1" nil true)
 
@@ -1550,10 +1556,12 @@
 
 (deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
+                            (.withTopoCache tc)
                             (.withBlobStore blob-store)))]
     (let [nimbus (.getNimbus cluster)
           bogus-secs 42
@@ -1576,8 +1584,8 @@
         ]
       (.thenReturn (Mockito/when (.stormBase cluster-state (Mockito/any String) (Mockito/anyObject))) storm-base)
       (.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases)
-      (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) topo-conf)
-      (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
+      (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) topo-conf)
+      (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
 
       (let [topos (.get_topologies (.getClusterInfo nimbus))]
         ; The number of topologies in the summary is correct.
@@ -1625,7 +1633,7 @@
                   _ (UtilsInstaller. fake-utils)
                   - (StormCommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf blob-store] nil)))
+                          (zkLeaderElectorImpl [conf blob-store tc] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
           (Nimbus. auth-conf fake-inimbus)
           (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
@@ -1643,11 +1651,13 @@
 
 (deftest test-validate-topo-config-on-submit
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
       (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
       (let [topology (Thrift/buildTopology {} {})
@@ -1694,7 +1704,7 @@
   (with-open [zk (InProcessZookeeper. )]
     (with-open [tmp-nimbus-dir (TmpPath.)
                 _ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
       (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1756,11 +1766,13 @@
 ;; log configs it contains are LogLevelAction/UNCHANGED
 (deftest empty-save-config-results-in-all-unchanged-actions
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
       (let [nimbus (.getNimbus cluster)
             previous-config (LogConfig.)
@@ -1777,7 +1789,7 @@
             (.set_target_log_level "ERROR")
             (.set_action LogLevelAction/UNCHANGED)))
 
-        (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+        (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
         (.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
 
         (.setLogConfig nimbus "foo" mock-config)
@@ -1785,11 +1797,13 @@
 
 (deftest log-level-update-merges-and-flags-existent-log-level
   (let [cluster-state (Mockito/mock IStormClusterState)
-        blob-store (Mockito/mock BlobStore)]
+        blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)]
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder. )
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
       (let [nimbus (.getNimbus cluster)
             previous-config (LogConfig.)
@@ -1823,7 +1837,7 @@
             (.set_target_log_level "DEBUG")
             (.set_action LogLevelAction/UNCHANGED)))
 
-        (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+        (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
         (.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
 
         (.setLogConfig nimbus "foo" mock-config)
@@ -1883,7 +1897,7 @@
         mock-blob-store (Mockito/mock BlobStore)
         conf {}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
         (.set (.getHeartbeatsCache nimbus) hb-cache)
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
@@ -1928,7 +1942,7 @@
         mock-blob-store (Mockito/mock BlobStore)
         conf {}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
         (.set (.getHeartbeatsCache nimbus) hb-cache)
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
@@ -1960,7 +1974,8 @@
         assignments {"topo1" assignment, "topo2" assignment2}
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
-        nimbus (Nimbus. {} nil mock-state nil mock-blob-store (MockLeaderElector. ) nil)]
+        mock-tc (Mockito/mock TopoCache)
+        nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
     (let [supervisor1-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
           user1-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor1-topologies))
           supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2"))
@@ -1980,9 +1995,10 @@
         assignments {"topo1" assignment, "authorized" assignment2}
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
-        nimbus (Nimbus. {} nil mock-state nil mock-blob-store (MockLeaderElector. ) nil)]
-    (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
-    (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
+        mock-tc (Mockito/mock TopoCache)
+        nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
+    (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
+    (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
     (.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))
     (let [supervisor-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
           user-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor-topologies))]

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index 149a0f2..f7aac52 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -61,7 +61,7 @@
 
 (defn nimbus-data [storm-conf inimbus]
   (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                  (zkLeaderElectorImpl [conf blob-store] (Mockito/mock ILeaderElector))))]
+                  (zkLeaderElectorImpl [conf blob-store tc] (Mockito/mock ILeaderElector))))]
     (org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock IStormClusterState) nil (Mockito/mock BlobStore) nil nil)))
 
 (defn dummy-service-handler

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 83a3267..4b2d085 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -20,13 +20,13 @@
   (:import [java.util Optional])
   (:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
   (:import [org.apache.storm.blobstore BlobStore])
-  (:import [org.apache.storm.utils NimbusClient])
+  (:import [org.apache.storm.daemon.nimbus TopoCache])
   (:import [org.apache.storm.generated NotAliveException StormBase])
   (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient
                                          ReqContext ThriftConnectionType])
   (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Processor
             AuthorizationException SubmitOptions TopologyInitialStatus KillOptions])
-  (:import [org.apache.storm.utils ConfigUtils Utils])
+  (:import [org.apache.storm.utils ConfigUtils NimbusClient Utils])
   (:import [org.apache.storm.cluster IStormClusterState])
   (:import [org.mockito Mockito Matchers])
   (:use [org.apache.storm util config daemon-config log])
@@ -64,12 +64,14 @@
 (deftest test-noop-authorization-w-simple-transport
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)
         topo-name "topo-name"]
     (.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/empty))
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder.)
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
@@ -88,14 +90,16 @@
 (deftest test-deny-authorization-w-simple-transport
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)
         topo-name "topo-name"
         topo-id "topo-name-1"]
     (.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/of topo-id))
-    (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+    (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder.)
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
@@ -150,14 +154,16 @@
 (deftest test-deny-authorization-w-sasl-digest
   (let [cluster-state (Mockito/mock IStormClusterState)
         blob-store (Mockito/mock BlobStore)
+        tc (Mockito/mock TopoCache)
         topo-name "topo-name"
         topo-id "topo-name-1"]
     (.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/of topo-id))
-    (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+    (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
     (with-open [cluster (.build
                           (doto (LocalCluster$Builder.)
                             (.withClusterState cluster-state)
                             (.withBlobStore blob-store)
+                            (.withTopoCache tc)
                             (.withNimbusDaemon)
                             (.withDaemonConf
                                {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 4fb4474..fa49d12 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -43,6 +43,7 @@ import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.nimbus.Nimbus;
 import org.apache.storm.daemon.nimbus.Nimbus.StandaloneINimbus;
+import org.apache.storm.daemon.nimbus.TopoCache;
 import org.apache.storm.daemon.supervisor.ReadClusterState;
 import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
 import org.apache.storm.daemon.supervisor.Supervisor;
@@ -140,6 +141,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
         private boolean nimbusDaemon = false;
         private UnaryOperator<Nimbus> nimbusWrapper = null;
         private BlobStore store = null;
+        private TopoCache topoCache = null;
         private IStormClusterState clusterState = null;
         private ILeaderElector leaderElector = null;
         private String trackId = null;
@@ -278,7 +280,16 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             this.store = store;
             return this;
         }
-        
+
+        /**
+         * Use the following topo cache instead of creating out own.
+         * This is intended mostly for internal testing with Mocks.
+         */
+        public Builder withTopoCache(TopoCache topoCache) {
+            this.topoCache = topoCache;
+            return this;
+        }
+
         /**
          * Use the following clusterState instead of the one in the config.
          * This is intended mostly for internal testing with Mocks.
@@ -430,8 +441,9 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             }
             //Set it for nimbus only
             conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
-            Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus, 
-                this.getClusterState(), null, builder.store, builder.leaderElector, builder.groupMapper);
+            Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus,
+                this.getClusterState(), null, builder.store, builder.topoCache, builder.leaderElector,
+                builder.groupMapper);
             if (builder.nimbusWrapper != null) {
                 nimbus = builder.nimbusWrapper.apply(nimbus);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index db2550d..00397a6 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -250,7 +250,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         if (killTime != null) {
             delay = ((Number)killTime).intValue();
         } else {
-            delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+            delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
         }
         nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
         StormBase sb = new StormBase();
@@ -271,7 +271,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         if (rbo.is_set_wait_secs()) {
             delay = rbo.get_wait_secs();
         } else {
-            delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+            delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
         }
         nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
         
@@ -549,8 +549,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ReqContext.context().subject();
     }
     
-    static Map<String, Object> readTopoConf(String topoId, BlobStore blobStore) throws KeyNotFoundException, AuthorizationException, IOException {
-        return blobStore.readTopologyConf(topoId, getSubject());
+    static Map<String, Object> readTopoConf(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+        return tc.readTopoConf(topoId, getSubject());
     }
     
     static List<String> getKeyListFromId(Map<String, Object> conf, String id) {
@@ -568,16 +568,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return kseq.getKeySequenceNumber(conf);
     }
     
-    private static StormTopology readStormTopology(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
-        return store.readTopology(topoId, getSubject());
+    private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+        return tc.readTopology(topoId, getSubject());
     }
     
-    private static Map<String, Object> readTopoConfAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
-        return store.readTopologyConf(topoId, NIMBUS_SUBJECT);
+    private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+        return tc.readTopoConf(topoId, NIMBUS_SUBJECT);
     }
     
-    private static StormTopology readStormTopologyAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
-        return store.readTopology(topoId, NIMBUS_SUBJECT);
+    private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+        return tc.readTopology(topoId, NIMBUS_SUBJECT);
     }
     
     /**
@@ -720,9 +720,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return state.getTopoId(topoName).isPresent();
     }
     
-    private static Map<String, Object> tryReadTopoConf(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
+    private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
         try {
-            return readTopoConfAsNimbus(topoId, store);
+            return readTopoConfAsNimbus(topoId, tc);
             //Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime,
             // so it is not needed
         } catch (KeyNotFoundException e) {
@@ -873,9 +873,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
     }
     
-    private static StormTopology tryReadTopology(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
+    private static StormTopology tryReadTopology(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
         try {
-            return readStormTopologyAsNimbus(topoId, store);
+            return readStormTopologyAsNimbus(topoId, tc);
         } catch (KeyNotFoundException e) {
             throw new NotAliveException(topoId);
         }
@@ -1033,6 +1033,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     @SuppressWarnings("deprecation")
     private final TimeCacheMap<String, WritableByteChannel> uploaders;
     private final BlobStore blobStore;
+    private final TopoCache topoCache;
     @SuppressWarnings("deprecation")
     private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
     @SuppressWarnings("deprecation")
@@ -1070,7 +1071,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
-            BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception {
+                  BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception {
+        this(conf, inimbus, stormClusterState, hostPortInfo, blobStore, null, leaderElector, groupMapper);
+    }
+
+    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
+            BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper)
+        throws Exception {
         this.conf = conf;
         if (hostPortInfo == null) {
             hostPortInfo = NimbusInfo.fromConf(conf);
@@ -1095,6 +1102,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo);
         }
         this.blobStore = blobStore;
+        if (topoCache == null) {
+            topoCache = new TopoCache(blobStore, conf);
+        }
+        this.topoCache = topoCache;
         this.blobDownloaders = makeBlobCacheMap(conf);
         this.blobUploaders = makeBlobCacheMap(conf);
         this.blobListers = makeBlobListCachMap(conf);
@@ -1106,7 +1117,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         });
         this.scheduler = makeScheduler(conf, inimbus);
         if (leaderElector == null) {
-            leaderElector = Zookeeper.zkLeaderElector(conf, blobStore);
+            leaderElector = Zookeeper.zkLeaderElector(conf, blobStore, topoCache);
         }
         this.leaderElector = leaderElector;
         this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
@@ -1148,7 +1159,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private BlobStore getBlobStore() {
         return blobStore;
     }
-    
+
+    private TopoCache getTopoCache() {
+        return topoCache;
+    }
+
     private boolean isLeader() throws Exception {
         return leaderElector.isLeader();
     }
@@ -1254,13 +1269,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 clusterState.setupBlobstore(jarKey, hostPortInfo, getVersionForKey(jarKey, hostPortInfo, conf));
             }
         }
-        
-        store.createBlob(confKey, Utils.toCompressedJsonConf(topoConf), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
+
+        topoCache.addTopoConf(topoId, subject, topoConf);
         if (store instanceof LocalFsBlobStore) {
             clusterState.setupBlobstore(confKey, hostPortInfo, getVersionForKey(confKey, hostPortInfo, conf));
         }
-        
-        store.createBlob(codeKey, Utils.serialize(topology), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
+
+        topoCache.addTopology(topoId, subject, topology);
         if (store instanceof LocalFsBlobStore) {
             clusterState.setupBlobstore(codeKey, hostPortInfo, getVersionForKey(codeKey, hostPortInfo, conf));
         }
@@ -1317,10 +1332,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
       AuthorizationException, IOException, InvalidTopologyException {
         assert (base != null);
         assert (topoId != null);
-        
-        BlobStore store = blobStore;
-        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
-        StormTopology topo = readStormTopologyAsNimbus(topoId, store);
+
+        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
+        StormTopology topo = readStormTopologyAsNimbus(topoId, topoCache);
         if (!base.is_set_principal()) {
             fixupBase(base, topoConf);
             stormClusterState.updateStorm(topoId, base);
@@ -1387,12 +1401,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     private List<List<Integer>> computeExecutors(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
-        BlobStore store = blobStore;
         assert (base != null);
 
         Map<String, Integer> compToExecutors = base.get_component_executors();
-        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
-        StormTopology topology = readStormTopologyAsNimbus(topoId, store);
+        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
+        StormTopology topology = readStormTopologyAsNimbus(topoId, topoCache);
         List<List<Integer>> ret = new ArrayList<>();
         if (compToExecutors != null) {
             Map<Integer, String> taskInfo = StormCommon.stormTaskInfo(topology, topoConf);
@@ -1413,10 +1426,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     private Map<List<Integer>, String> computeExecutorToComponent(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
-        BlobStore store = blobStore;
         List<List<Integer>> executors = computeExecutors(topoId, base);
-        StormTopology topology = readStormTopologyAsNimbus(topoId, store);
-        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
+        StormTopology topology = readStormTopologyAsNimbus(topoId, topoCache);
+        Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
         Map<Integer, String> taskToComponent = StormCommon.stormTaskInfo(topology, topoConf);
         Map<List<Integer>, String> ret = new HashMap<>();
         for (List<Integer> executor: executors) {
@@ -1869,9 +1881,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
         assert(TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE == initStatus);
         IStormClusterState state = stormClusterState;
-        BlobStore store = blobStore;
-        Map<String, Object> topoConf = readTopoConf(topoId, store);
-        StormTopology topology = StormCommon.systemTopology(topoConf, readStormTopology(topoId, store));
+        Map<String, Object> topoConf = readTopoConf(topoId, topoCache);
+        StormTopology topology = StormCommon.systemTopology(topoConf, readStormTopology(topoId, topoCache));
         Map<String, Integer> numExecutors = new HashMap<>();
         for (Entry<String, Object> entry: StormCommon.allComponents(topology).entrySet()) {
             numExecutors.put(entry.getKey(), StormCommon.numStartExecutors(entry.getValue()));
@@ -1906,7 +1917,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         IStormClusterState state = stormClusterState;
         String topoId = state.getTopoId(topoName)
                 .orElseThrow(() -> new NotAliveException(topoName + " is not alive"));
-        return tryReadTopoConf(topoId, blobStore);
+        return tryReadTopoConf(topoId, topoCache);
     }
 
     @VisibleForTesting
@@ -1959,7 +1970,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
-        Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+        Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
         topoConf = merge(conf, topoConf);
         String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
         try {
@@ -1986,7 +1997,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             BlobStore store = blobStore;
             IStormClusterState state = stormClusterState;
-            StormTopology topo = readStormTopologyAsNimbus(topoId, store);
+            StormTopology topo = readStormTopologyAsNimbus(topoId, topoCache);
             List<String> dependencyJars = topo.get_dependency_jars();
             LOG.info("Removing dependency jars from blobs - {}", dependencyJars);
             if (dependencyJars != null && !dependencyJars.isEmpty()) {
@@ -2004,9 +2015,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public void rmTopologyKeys(String topoId) {
         BlobStore store = blobStore;
         IStormClusterState state = stormClusterState;
+        try {
+            topoCache.deleteTopoConf(topoId, NIMBUS_SUBJECT);
+        } catch (Exception e) {
+            //Just go on and try to delete the others
+        }
+        try {
+            topoCache.deleteTopology(topoId, NIMBUS_SUBJECT);
+        } catch (Exception e) {
+            //Just go on and try to delte the others
+        }
         rmBlobKey(store, ConfigUtils.masterStormJarKey(topoId), state);
-        rmBlobKey(store, ConfigUtils.masterStormConfKey(topoId), state);
-        rmBlobKey(store, ConfigUtils.masterStormCodeKey(topoId), state);
     }
 
     @VisibleForTesting
@@ -2139,7 +2158,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             return;
         }
         IStormClusterState state = stormClusterState;
-        BlobStore store = blobStore;
         Collection<ICredentialsRenewer> renewers = credRenewers;
         Object lock = credUpdateLock;
         Map<String, StormBase> assignedBases = state.topologyBases();
@@ -2147,7 +2165,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
                 String id = entry.getKey();
                 String ownerPrincipal = entry.getValue().get_principal();
-                Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, store)));
+                Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, topoCache)));
                 synchronized(lock) {
                     Credentials origCreds = state.credentials(id, null);
                     if (origCreds != null) {
@@ -2261,7 +2279,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             TopologySummary summary = new TopologySummary(topoId, base.get_name(), numTasks, numExecutors, numWorkers,
                     Time.deltaSecs(base.get_launch_time_secs()), extractStatusStr(base));
             try {
-                StormTopology topo = tryReadTopology(topoId, blobStore);
+                StormTopology topo = tryReadTopology(topoId, topoCache);
                 if (topo != null && topo.is_set_storm_version()) {
                     summary.set_storm_version(topo.get_storm_version());
                 }
@@ -2313,13 +2331,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     private CommonTopoInfo getCommonTopoInfo(String topoId, String operation) throws NotAliveException, AuthorizationException, IOException, InvalidTopologyException {
-        BlobStore store = blobStore;
         IStormClusterState state = stormClusterState;
         CommonTopoInfo ret = new CommonTopoInfo();
-        ret.topoConf = tryReadTopoConf(topoId, store);
+        ret.topoConf = tryReadTopoConf(topoId, topoCache);
         ret.topoName = (String)ret.topoConf.get(Config.TOPOLOGY_NAME);
         checkAuthorization(ret.topoName, ret.topoConf, operation);
-        ret.topology = tryReadTopology(topoId, store);
+        ret.topology = tryReadTopology(topoId, topoCache);
         ret.taskToComponent = StormCommon.stormTaskInfo(ret.topology, ret.topoConf);
         ret.base = state.stormBase(topoId, null);
         if (ret.base != null && ret.base.is_set_launch_time_secs()) {
@@ -2747,7 +2764,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public void setLogConfig(String topoId, LogConfig config) throws TException {
         try {
             setLogConfigCalls.mark();
-            Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
             topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "setLogConfig");
@@ -2804,7 +2821,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public LogConfig getLogConfig(String topoId) throws TException {
         try {
             getLogConfigCalls.mark();
-            Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
             topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getLogConfig");
@@ -2830,7 +2847,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             IStormClusterState state = stormClusterState;
             String topoId = toTopoId(topoName);
-            Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
             topoConf = merge(conf, topoConf);
             // make sure samplingPct is within bounds.
             double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
@@ -2871,7 +2888,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public void setWorkerProfiler(String topoId, ProfileRequest profileRequest) throws TException {
         try {
             setWorkerProfilerCalls.mark();
-            Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
             topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "setWorkerProfiler");
@@ -2943,7 +2960,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (topoId == null) {
                 throw new NotAliveException(topoName + " is not alive");
             }
-            Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
             topoConf = merge(conf, topoConf);
             if (credentials == null) {
                 credentials = new Credentials(Collections.emptyMap());
@@ -3750,7 +3767,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public String getTopologyConf(String id) throws NotAliveException, AuthorizationException, TException {
         try {
             getTopologyConfCalls.mark();
-            Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
             Map<String, Object> checkConf = merge(conf, topoConf);
             String topoName = (String) checkConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, checkConf, "getTopologyConf");
@@ -3768,11 +3785,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public StormTopology getTopology(String id) throws NotAliveException, AuthorizationException, TException {
         try {
             getTopologyCalls.mark();
-            Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
             topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getTopology");
-            return StormCommon.systemTopology(topoConf, tryReadTopology(id, blobStore));
+            return StormCommon.systemTopology(topoConf, tryReadTopology(id, topoCache));
         } catch (Exception e) {
             LOG.warn("Get topology exception. (topology id='{}')", id, e);
             if (e instanceof TException) {
@@ -3786,11 +3803,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException {
         try {
             getUserTopologyCalls.mark();
-            Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+            Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
             topoConf = merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getUserTopology");
-            return tryReadTopology(id, blobStore);
+            return tryReadTopology(id, topoCache);
         } catch (Exception e) {
             LOG.warn("Get user topology exception. (topology id='{}')", id, e);
             if (e instanceof TException) {
@@ -3806,12 +3823,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             List<String> adminUsers = (List<String>) conf.getOrDefault(Config.NIMBUS_ADMINS, Collections.emptyList());
             IStormClusterState state = stormClusterState;
-            BlobStore store = blobStore;
             List<String> assignedIds = state.assignments(null);
             Set<String> ret = new HashSet<>();
             boolean isAdmin = adminUsers.contains(user);
             for (String topoId: assignedIds) {
-                Map<String, Object> topoConf = tryReadTopoConf(topoId, store);
+                Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
                 topoConf = merge(conf, topoConf);
                 List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
                 List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
new file mode 100644
index 0000000..c7387f9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.security.auth.Subject;
+
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache topologies and topology confs from the blob store.
+ * Makes reading this faster because it can skip
+ * deserialization in many cases.
+ */
+public class TopoCache {
+    public static final Logger LOG = LoggerFactory.getLogger(TopoCache.class);
+
+    private static final class WithAcl<T> {
+        public final List<AccessControl> acl;
+        public final T data;
+
+        public WithAcl(List<AccessControl> acl, T data) {
+            this.acl = acl;
+            this.data = data;
+        }
+    }
+
+    private final BlobStore store;
+    private final BlobStoreAclHandler aclHandler;
+    private final ConcurrentHashMap<String, WithAcl<StormTopology>> topos = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, WithAcl<Map<String, Object>>> confs = new ConcurrentHashMap<>();
+
+    public TopoCache(BlobStore store, Map<String, Object> conf) {
+        this.store = store;
+        aclHandler = new BlobStoreAclHandler(conf);
+    }
+
+    /**
+     * Read a topology.
+     * @param topoId the id of the topology to read
+     * @param who who to read it as
+     * @return the deserialized topology.
+     * @throws IOException on any error while reading the blob.
+     * @throws AuthorizationException if who is not allowed to read the blob
+     * @throws KeyNotFoundException if the blob could not be found
+     */
+    public StormTopology readTopology(final String topoId, final Subject who)
+        throws KeyNotFoundException, AuthorizationException, IOException {
+        final String key = ConfigUtils.masterStormCodeKey(topoId);
+        WithAcl<StormTopology> cached = topos.get(topoId);
+        if (cached == null) {
+            //We need to read a new one
+            StormTopology topo = Utils.deserialize(store.readBlob(key, who), StormTopology.class);
+            ReadableBlobMeta meta = store.getBlobMeta(key, who);
+            cached = new WithAcl<>(meta.get_settable().get_acl(), topo);
+            WithAcl<StormTopology> previous = topos.putIfAbsent(topoId, cached);
+            if (previous != null) {
+                cached = previous;
+            }
+        } else {
+            //Check if the user is allowed to read this
+            aclHandler.hasPermissions(cached.acl, READ, who, key);
+        }
+        return cached.data;
+    }
+
+    /**
+     * Delete a topology when we are done.
+     * @param topoId the id of the topology
+     * @param who who is deleting it
+     * @throws AuthorizationException if who is not allowed to delete the blob
+     * @throws KeyNotFoundException if the blob could not be found
+     */
+    public void deleteTopology(final String topoId, final Subject who) throws AuthorizationException, KeyNotFoundException {
+        final String key = ConfigUtils.masterStormCodeKey(topoId);
+        store.deleteBlob(key, who);
+        topos.remove(topoId);
+    }
+
+    /**
+     * Add a new topology.
+     * @param topoId the id of the topology
+     * @param who who is doing it
+     * @param topo the topology itself
+     * @throws AuthorizationException if who is not allowed to add a topology
+     * @throws KeyAlreadyExistsException if the topology already exists
+     * @throws IOException on any error interacting with the blob store
+     */
+    public void addTopology(final String topoId, final Subject who, final StormTopology topo)
+        throws AuthorizationException, KeyAlreadyExistsException, IOException {
+        final String key = ConfigUtils.masterStormCodeKey(topoId);
+        final List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+        store.createBlob(key, Utils.serialize(topo), new SettableBlobMeta(acl), who);
+        topos.put(topoId, new WithAcl<>(acl, topo));
+    }
+
+    /**
+     * Update an existing topology .
+     * @param topoId the id of the topology
+     * @param who who is doing it
+     * @param topo the new topology to save
+     * @throws AuthorizationException if who is not allowed to update a topology
+     * @throws KeyNotFoundException if the topology is not found in the blob store
+     * @throws IOException on any error interacting with the blob store
+     */
+    public void updateTopology(final String topoId, final Subject who, final StormTopology topo)
+        throws AuthorizationException, KeyNotFoundException, IOException {
+        final String key = ConfigUtils.masterStormCodeKey(topoId);
+        store.updateBlob(key, Utils.serialize(topo), who);
+        List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+        WithAcl<StormTopology> old = topos.get(topoId);
+        if (old != null) {
+            acl = old.acl;
+        } else {
+            acl = store.getBlobMeta(key, who).get_settable().get_acl();
+        }
+        topos.put(topoId, new WithAcl<>(acl, topo));
+    }
+
+    /**
+     * Read a topology conf.
+     * @param topoId the id of the topology to read the conf for
+     * @param who who to read it as
+     * @return the deserialized config.
+     * @throws IOException on any error while reading the blob.
+     * @throws AuthorizationException if who is not allowed to read the blob
+     * @throws KeyNotFoundException if the blob could not be found
+     */
+    public Map<String, Object> readTopoConf(final String topoId, final Subject who)
+        throws KeyNotFoundException, AuthorizationException, IOException {
+        final String key = ConfigUtils.masterStormConfKey(topoId);
+        WithAcl<Map<String, Object>> cached = confs.get(topoId);
+        if (cached == null) {
+            //We need to read a new one
+            Map<String, Object> topoConf = Utils.fromCompressedJsonConf(store.readBlob(key, who));
+            ReadableBlobMeta meta = store.getBlobMeta(key, who);
+            cached = new WithAcl<>(meta.get_settable().get_acl(), topoConf);
+            WithAcl<Map<String, Object>> previous = confs.putIfAbsent(topoId, cached);
+            if (previous != null) {
+                cached = previous;
+            }
+        } else {
+            //Check if the user is allowed to read this
+            aclHandler.hasPermissions(cached.acl, READ, who, key);
+        }
+        return cached.data;
+    }
+
+    /**
+     * Delete a topology conf when we are done.
+     * @param topoId the id of the topology
+     * @param who who is deleting it
+     * @throws AuthorizationException if who is not allowed to delete the topo conf
+     * @throws KeyNotFoundException if the topo conf is not found in the blob store
+     */
+    public void deleteTopoConf(final String topoId, final Subject who) throws AuthorizationException, KeyNotFoundException {
+        final String key = ConfigUtils.masterStormConfKey(topoId);
+        store.deleteBlob(key, who);
+        confs.remove(topoId);
+    }
+
+    /**
+     * Add a new topology config.
+     * @param topoId the id of the topology
+     * @param who who is doing it
+     * @param topoConf the topology conf itself
+     * @throws AuthorizationException if who is not allowed to add a topology conf
+     * @throws KeyAlreadyExistsException if the toplogy conf already exists in the blob store
+     * @throws IOException on any error interacting with the blob store.
+     */
+    public void addTopoConf(final String topoId, final Subject who, final Map<String, Object> topoConf)
+        throws AuthorizationException, KeyAlreadyExistsException, IOException {
+        final String key = ConfigUtils.masterStormConfKey(topoId);
+        final List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+        store.createBlob(key, Utils.toCompressedJsonConf(topoConf), new SettableBlobMeta(acl), who);
+        confs.put(topoId, new WithAcl<>(acl, topoConf));
+    }
+
+    /**
+     * Update an existing topology conf.
+     * @param topoId the id of the topology
+     * @param who who is doing it
+     * @param topoConf the new topology conf to save
+     * @throws AuthorizationException if who is not allowed to update the topology conf
+     * @throws KeyNotFoundException if the topology conf is not found in the blob store
+     * @throws IOException on any error interacting with the blob store.
+     */
+    public void updateTopoConf(final String topoId, final Subject who, final Map<String, Object> topoConf)
+        throws AuthorizationException, KeyNotFoundException, IOException {
+        final String key = ConfigUtils.masterStormConfKey(topoId);
+        store.updateBlob(key, Utils.toCompressedJsonConf(topoConf), who);
+        List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+        WithAcl<Map<String, Object>> old = confs.get(topoId);
+        if (old != null) {
+            acl = old.acl;
+        } else {
+            acl = store.getBlobMeta(key, who).get_settable().get_acl();
+        }
+        confs.put(topoId, new WithAcl<>(acl, topoConf));
+    }
+
+    /**
+     * Clear all entries from the Cache. This typically happens right after becoming a leader, just to be sure
+     * nothing has changed while we were not the leader.
+     */
+    public void clear() {
+        confs.clear();
+        topos.clear();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 09c14ba..6bf39c3 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.daemon.nimbus.TopoCache;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
@@ -44,9 +45,11 @@ public class LeaderElectorImp implements ILeaderElector {
     private final AtomicReference<LeaderLatch> leaderLatch;
     private final AtomicReference<LeaderLatchListener> leaderLatchListener;
     private final BlobStore blobStore;
+    private final TopoCache tc;
 
-    public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference<LeaderLatch> leaderLatch,
-            AtomicReference<LeaderLatchListener> leaderLatchListener, BlobStore blobStore) {
+    public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
+                            AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
+                            BlobStore blobStore, final TopoCache tc) {
         this.conf = conf;
         this.servers = servers;
         this.zk = zk;
@@ -55,6 +58,7 @@ public class LeaderElectorImp implements ILeaderElector {
         this.leaderLatch = leaderLatch;
         this.leaderLatchListener = leaderLatchListener;
         this.blobStore = blobStore;
+        this.tc = tc;
     }
 
     @Override
@@ -67,7 +71,7 @@ public class LeaderElectorImp implements ILeaderElector {
         // if this latch is already closed, we need to create new instance.
         if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
             leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
-            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get()));
+            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get(), tc));
             LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
         }
         // Only if the latch is not already started we invoke start

http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index 4979588..802d3ba 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -31,6 +31,7 @@ import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.daemon.nimbus.TopoCache;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.StormTopology;
@@ -124,7 +125,10 @@ public class Zookeeper {
     }
 
     // Leader latch listener that will be invoked when we either gain or lose leadership
-    public static LeaderLatchListener leaderLatchListenerImpl(final Map<String, Object> conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
+    public static LeaderLatchListener leaderLatchListenerImpl(final Map<String, Object> conf, final CuratorFramework zk,
+                                                              final BlobStore blobStore, final LeaderLatch leaderLatch,
+                                                              final TopoCache tc)
+        throws UnknownHostException {
         final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
         return new LeaderLatchListener() {
             final String STORM_JAR_SUFFIX = "-stormjar.jar";
@@ -133,7 +137,8 @@ public class Zookeeper {
 
             @Override
             public void isLeader() {
-                Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+                Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk,
+                    conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
 
                 Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
                 Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
@@ -157,8 +162,10 @@ public class Zookeeper {
 
                     if (diffDependencies.isEmpty()) {
                         LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
+                        tc.clear();
                     } else {
-                        LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
+                        LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
+                            + "giving up leadership.");
                         closeLatch();
                     }
                 } else {
@@ -170,6 +177,8 @@ public class Zookeeper {
             @Override
             public void notLeader() {
                 LOG.info("{} lost leadership.", hostName);
+                //Just to be sure
+                tc.clear();
             }
 
             private String generateJoinedString(Set<String> activeTopologyIds) {
@@ -241,11 +250,13 @@ public class Zookeeper {
         };
     }
 
-    public static ILeaderElector zkLeaderElector(Map<String, Object> conf, BlobStore blobStore) throws UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf, blobStore);
+    public static ILeaderElector zkLeaderElector(Map<String, Object> conf, BlobStore blobStore, final TopoCache tc)
+        throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, blobStore, tc);
     }
 
-    protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, BlobStore blobStore) throws UnknownHostException {
+    protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, BlobStore blobStore, final TopoCache tc)
+        throws UnknownHostException {
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         CuratorFramework zk = ClientZookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), conf);
@@ -253,9 +264,9 @@ public class Zookeeper {
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
         AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
-                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
+                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get(), tc));
         return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
-            leaderLatchListenerAtomicReference, blobStore);
+            leaderLatchListenerAtomicReference, blobStore, tc);
     }
 
 }


Mime
View raw message