storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [1/3] storm git commit: STORM-2323 Precondition for Leader Nimbus should check all topology blobs and also corresponding dependencies
Date Wed, 01 Feb 2017 02:36:01 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 2a01dbc75 -> 83bf0ba8d


STORM-2323 Precondition for Leader Nimbus should check all topology blobs and also corresponding
dependencies

* change the precondition for leader Nimbus
** it should have all active topology blobs and corresponding dependencies locally
* change zookeeper.clj to use Zookeeper.leaderLatchListenerImpl


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b19b0a1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b19b0a1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b19b0a1d

Branch: refs/heads/1.x-branch
Commit: b19b0a1da4488bf91d60aa1487d6ed6109af4162
Parents: 194a9cb
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Wed Jan 25 13:16:44 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Mon Jan 30 22:14:47 2017 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/zookeeper.clj      |  29 +----
 .../org/apache/storm/zookeeper/Zookeeper.java   | 119 ++++++++++++++++---
 2 files changed, 104 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b19b0a1d/storm-core/src/clj/org/apache/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj
index 2b5da55..ca41093 100644
--- a/storm-core/src/clj/org/apache/storm/zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj
@@ -32,7 +32,8 @@
   (:import [java.io File])
   (:import [java.util List Map])
   (:import [org.apache.storm.utils Utils ZookeeperAuthInfo]
-           (org.apache.storm.blobstore KeyFilter BlobStore))
+           (org.apache.storm.blobstore KeyFilter BlobStore)
+           (org.apache.storm.zookeeper Zookeeper))
   (:use [org.apache.storm util log config]))
 
 (def zk-keeper-states
@@ -249,28 +250,6 @@
                 (filter [this key] (get-id-from-blob-key key)))]
     (set (.filterAndListKeys blob-store to-id))))
 
-(defn leader-latch-listener-impl
-  "Leader latch listener that will be invoked when we either gain or lose leadership"
-  [conf zk blob-store leader-latch]
-  (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
-        STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")]
-    (reify LeaderLatchListener
-      (^void isLeader[this]
-        (log-message (str hostname " gained leadership, checking if it has all the topology
code locally."))
-        (let [active-topology-ids (set (get-children zk STORMS-ROOT false))
-              local-topology-ids (set (code-ids blob-store))
-              diff-topology (set/difference active-topology-ids local-topology-ids)]
-          (log-message "active-topology-ids [" (clojure.string/join "," active-topology-ids)
-                       "] local-topology-ids [" (clojure.string/join "," local-topology-ids)
-                       "] diff-topology [" (clojure.string/join "," diff-topology) "]")
-          (if (empty? diff-topology)
-            (log-message "Accepting leadership, all active topology found localy.")
-            (do
-              (log-message "code for all active topologies not available locally, giving
up leadership.")
-              (.close leader-latch)))))
-      (^void notLeader[this]
-        (log-message (str hostname " lost leadership."))))))
-
 (defn zk-leader-elector
   "Zookeeper Implementation of ILeaderElector."
   [conf blob-store]
@@ -279,7 +258,7 @@
         leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
         id (.toHostPortString (NimbusInfo/fromConf conf))
         leader-latch (atom (LeaderLatch. zk leader-lock-path id))
-        leader-latch-listener (atom (leader-latch-listener-impl conf zk blob-store @leader-latch))
+        leader-latch-listener (atom (Zookeeper/leaderLatchListenerImpl conf zk blob-store
@leader-latch))
         ]
     (reify ILeaderElector
       (prepare [this conf]
@@ -290,7 +269,7 @@
         (if (.equals LeaderLatch$State/CLOSED (.getState @leader-latch))
           (do
             (reset! leader-latch (LeaderLatch. zk leader-lock-path id))
-            (reset! leader-latch-listener (leader-latch-listener-impl conf zk blob-store
@leader-latch))
+            (reset! leader-latch-listener (Zookeeper/leaderLatchListenerImpl conf zk blob-store
@leader-latch))
             (log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.")
             ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b19b0a1d/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index ef35307..a2ad797 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -20,6 +20,7 @@ package org.apache.storm.zookeeper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -31,15 +32,17 @@ import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.KeyFilter;
+import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
 import org.apache.storm.cluster.ClusterUtils;
-import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.zookeeper.KeeperException;
@@ -51,6 +54,7 @@ import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -336,29 +340,43 @@ public class Zookeeper {
     public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework
zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException
{
         final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
         return new LeaderLatchListener() {
+            final String STORM_JAR_SUFFIX = "-stormjar.jar";
+            final String STORM_CODE_SUFFIX = "-stormcode.ser";
+            final String STORM_CONF_SUFFIX = "-stormconf.ser";
+
             @Override
             public void isLeader() {
-                Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk,
conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
-                Set<String> localTopologyIds = blobStore.filterAndListKeys(new KeyFilter<String>()
{
-                    @Override
-                    public String filter(String key) {
-                        return ConfigUtils.getIdFromBlobKey(key);
-                    }
-                });
-                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds,
localTopologyIds);
-                LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology
[{}]",
-                        generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds),
+                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk,
conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+
+                Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
+                Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
+                Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
+                Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);
+
+                // this finds all active topologies blob keys from all local topology blob
keys
+                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys,
allLocalTopologyBlobKeys);
+                LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs
[{}]",
+                        generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
                         generateJoinedString(diffTopology));
 
                 if (diffTopology.isEmpty()) {
-                    LOG.info("Accepting leadership, all active topology found locally.");
+                    Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);
+
+                    // this finds all dependency blob keys from active topologies from all
local blob keys
+                    Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies,
allLocalBlobKeys);
+                    LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies
[{}]",
+                            generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
+                            generateJoinedString(diffDependencies));
+
+                    if (diffDependencies.isEmpty()) {
+                        LOG.info("Accepting leadership, all active topologies and corresponding
dependencies found locally.");
+                    } else {
+                        LOG.info("Code for all active topologies is available locally, but
some dependencies are not found locally, giving up leadership.");
+                        closeLatch();
+                    }
                 } else {
                     LOG.info("code for all active topologies not available locally, giving
up leadership.");
-                    try {
-                        leaderLatch.close();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
+                    closeLatch();
                 }
             }
 
@@ -370,6 +388,69 @@ public class Zookeeper {
             private String generateJoinedString(Set<String> activeTopologyIds) {
                 return Joiner.on(",").join(activeTopologyIds);
             }
+
+            private Set<String> populateTopologyBlobKeys(Set<String> activeTopologyIds)
{
+                Set<String> activeTopologyBlobKeys = new TreeSet<>();
+                for (String activeTopologyId : activeTopologyIds) {
+                    activeTopologyBlobKeys.add(activeTopologyId + STORM_JAR_SUFFIX);
+                    activeTopologyBlobKeys.add(activeTopologyId + STORM_CODE_SUFFIX);
+                    activeTopologyBlobKeys.add(activeTopologyId + STORM_CONF_SUFFIX);
+                }
+                return activeTopologyBlobKeys;
+            }
+
+            private Set<String> filterTopologyBlobKeys(Set<String> blobKeys)
{
+                Set<String> topologyBlobKeys = new HashSet<>();
+                for (String blobKey : blobKeys) {
+                    if (blobKey.endsWith(STORM_JAR_SUFFIX) || blobKey.endsWith(STORM_CODE_SUFFIX)
||
+                            blobKey.endsWith(STORM_CONF_SUFFIX)) {
+                        topologyBlobKeys.add(blobKey);
+                    }
+                }
+                return topologyBlobKeys;
+            }
+
+            private Set<String> filterTopologyCodeKeys(Set<String> blobKeys)
{
+                Set<String> topologyCodeKeys = new HashSet<>();
+                for (String blobKey : blobKeys) {
+                    if (blobKey.endsWith(STORM_CODE_SUFFIX)) {
+                        topologyCodeKeys.add(blobKey);
+                    }
+                }
+                return topologyCodeKeys;
+            }
+
+            private Set<String> getTopologyDependencyKeys(Set<String> activeTopologyCodeKeys)
{
+                Set<String> activeTopologyDependencies = new TreeSet<>();
+                Subject subject = ReqContext.context().subject();
+
+                for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
+                    try {
+                        InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey,
subject);
+                        byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
+                        StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
+                        if (stormCode.is_set_dependency_jars()) {
+                            activeTopologyDependencies.addAll(stormCode.get_dependency_jars());
+                        }
+                        if (stormCode.is_set_dependency_artifacts()) {
+                            activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts());
+                        }
+                    } catch (AuthorizationException | KeyNotFoundException | IOException
e) {
+                        LOG.error("Exception occurs while reading blob for key: " + activeTopologyCodeKey
+ ", exception: " + e, e);
+                        throw new RuntimeException("Exception occurs while reading blob for
key: " + activeTopologyCodeKey +
+                                ", exception: " + e, e);
+                    }
+                }
+                return activeTopologyDependencies;
+            }
+
+            private void closeLatch() {
+                try {
+                    leaderLatch.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         };
     }
 


Mime
View raw message