storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [1/3] storm git commit: STORM-2323 Precondition for Leader Nimbus should check all topology blobs and also corresponding dependencies
Date Wed, 01 Feb 2017 02:15:56 GMT
Repository: storm
Updated Branches:
  refs/heads/master 4c5e34ee6 -> 22cecb087


STORM-2323 Precondition for Leader Nimbus should check all topology blobs and also corresponding
dependencies

* change the precondition for leader Nimbus
** it should have all active topology blobs and corresponding dependencies locally


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7373f43
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7373f43
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7373f43

Branch: refs/heads/master
Commit: f7373f43511d1410f0f026034a4c8194e5f1a7f1
Parents: 1811273
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Wed Jan 25 13:16:44 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Thu Jan 26 08:58:20 2017 +0900

----------------------------------------------------------------------
 .../org/apache/storm/zookeeper/Zookeeper.java   | 119 ++++++++++++++++---
 1 file changed, 100 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f7373f43/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index ef35307..a2ad797 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -20,6 +20,7 @@ package org.apache.storm.zookeeper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -31,15 +32,17 @@ import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.KeyFilter;
+import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
 import org.apache.storm.cluster.ClusterUtils;
-import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.zookeeper.KeeperException;
@@ -51,6 +54,7 @@ import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -336,29 +340,43 @@ public class Zookeeper {
     public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework
zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException
{
         final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
         return new LeaderLatchListener() {
+            final String STORM_JAR_SUFFIX = "-stormjar.jar";
+            final String STORM_CODE_SUFFIX = "-stormcode.ser";
+            final String STORM_CONF_SUFFIX = "-stormconf.ser";
+
             @Override
             public void isLeader() {
-                Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk,
conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
-                Set<String> localTopologyIds = blobStore.filterAndListKeys(new KeyFilter<String>()
{
-                    @Override
-                    public String filter(String key) {
-                        return ConfigUtils.getIdFromBlobKey(key);
-                    }
-                });
-                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds,
localTopologyIds);
-                LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology
[{}]",
-                        generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds),
+                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk,
conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+
+                Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
+                Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
+                Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
+                Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);
+
+                // this finds all active topologies blob keys from all local topology blob
keys
+                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys,
allLocalTopologyBlobKeys);
+                LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs
[{}]",
+                        generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
                         generateJoinedString(diffTopology));
 
                 if (diffTopology.isEmpty()) {
-                    LOG.info("Accepting leadership, all active topology found locally.");
+                    Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);
+
+                    // this finds all dependency blob keys from active topologies from all
local blob keys
+                    Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies,
allLocalBlobKeys);
+                    LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies
[{}]",
+                            generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
+                            generateJoinedString(diffDependencies));
+
+                    if (diffDependencies.isEmpty()) {
+                        LOG.info("Accepting leadership, all active topologies and corresponding
dependencies found locally.");
+                    } else {
+                        LOG.info("Code for all active topologies is available locally, but
some dependencies are not found locally, giving up leadership.");
+                        closeLatch();
+                    }
                 } else {
                     LOG.info("code for all active topologies not available locally, giving
up leadership.");
-                    try {
-                        leaderLatch.close();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
+                    closeLatch();
                 }
             }
 
@@ -370,6 +388,69 @@ public class Zookeeper {
             private String generateJoinedString(Set<String> activeTopologyIds) {
                 return Joiner.on(",").join(activeTopologyIds);
             }
+
+            private Set<String> populateTopologyBlobKeys(Set<String> activeTopologyIds)
{
+                Set<String> activeTopologyBlobKeys = new TreeSet<>();
+                for (String activeTopologyId : activeTopologyIds) {
+                    activeTopologyBlobKeys.add(activeTopologyId + STORM_JAR_SUFFIX);
+                    activeTopologyBlobKeys.add(activeTopologyId + STORM_CODE_SUFFIX);
+                    activeTopologyBlobKeys.add(activeTopologyId + STORM_CONF_SUFFIX);
+                }
+                return activeTopologyBlobKeys;
+            }
+
+            private Set<String> filterTopologyBlobKeys(Set<String> blobKeys)
{
+                Set<String> topologyBlobKeys = new HashSet<>();
+                for (String blobKey : blobKeys) {
+                    if (blobKey.endsWith(STORM_JAR_SUFFIX) || blobKey.endsWith(STORM_CODE_SUFFIX)
||
+                            blobKey.endsWith(STORM_CONF_SUFFIX)) {
+                        topologyBlobKeys.add(blobKey);
+                    }
+                }
+                return topologyBlobKeys;
+            }
+
+            private Set<String> filterTopologyCodeKeys(Set<String> blobKeys)
{
+                Set<String> topologyCodeKeys = new HashSet<>();
+                for (String blobKey : blobKeys) {
+                    if (blobKey.endsWith(STORM_CODE_SUFFIX)) {
+                        topologyCodeKeys.add(blobKey);
+                    }
+                }
+                return topologyCodeKeys;
+            }
+
+            private Set<String> getTopologyDependencyKeys(Set<String> activeTopologyCodeKeys)
{
+                Set<String> activeTopologyDependencies = new TreeSet<>();
+                Subject subject = ReqContext.context().subject();
+
+                for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
+                    try {
+                        InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey,
subject);
+                        byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
+                        StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
+                        if (stormCode.is_set_dependency_jars()) {
+                            activeTopologyDependencies.addAll(stormCode.get_dependency_jars());
+                        }
+                        if (stormCode.is_set_dependency_artifacts()) {
+                            activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts());
+                        }
+                    } catch (AuthorizationException | KeyNotFoundException | IOException
e) {
+                        LOG.error("Exception occurs while reading blob for key: " + activeTopologyCodeKey
+ ", exception: " + e, e);
+                        throw new RuntimeException("Exception occurs while reading blob for
key: " + activeTopologyCodeKey +
+                                ", exception: " + e, e);
+                    }
+                }
+                return activeTopologyDependencies;
+            }
+
+            private void closeLatch() {
+                try {
+                    leaderLatch.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         };
     }
 


Mime
View raw message