storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [03/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:37 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 88e2791..8c9ce0a 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -17,23 +17,16 @@
  */
 package org.apache.storm.utils;
 
-import org.apache.storm.Config;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.blobstore.BlobStoreAclHandler;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.blobstore.LocalFsBlobStore;
-import org.apache.storm.daemon.JarTransformer;
-import org.apache.storm.generated.*;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.serialization.DefaultSerializationDelegate;
-import org.apache.storm.serialization.SerializationDelegate;
 import clojure.lang.IFn;
-import clojure.lang.RT;
+import clojure.lang.Keyword;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
@@ -41,6 +34,31 @@ import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
 import org.apache.curator.ensemble.exhibitor.Exhibitors;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.blobstore.LocalFsBlobStore;
+import org.apache.storm.daemon.JarTransformer;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.serialization.DefaultSerializationDelegate;
+import org.apache.storm.serialization.SerializationDelegate;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -58,12 +76,15 @@ import org.yaml.snakeyaml.constructor.SafeConstructor;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -74,11 +95,16 @@ import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.net.UnknownHostException;
+import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
@@ -89,11 +115,14 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Matcher;
@@ -103,8 +132,27 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import clojure.lang.RT;
+
 public class Utils {
-    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static Utils _instance = new Utils();
+
+    /**
+     * Provide an instance of this class for delegates to use.  To mock out
+     * delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     * @param u a Utils instance
+     * @return the previously set instance
+     */
+    public static Utils setInstance(Utils u) {
+        Utils oldInstance = _instance;
+        _instance = u;
+        return oldInstance;
+    }
+
+    public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
     public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
     public static final String CURRENT_BLOB_SUFFIX_ID = "current";
@@ -115,15 +163,35 @@ public class Utils {
     private static SerializationDelegate serializationDelegate;
     private static ClassLoader cl = null;
 
+    public static final boolean IS_ON_WINDOWS = "Windows_NT".equals(System.getenv("OS"));
+    public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator");
+    public static final String CLASS_PATH_SEPARATOR = System.getProperty("path.separator");
+
+    public static final int SIGKILL = 9;
+    public static final int SIGTERM = 15;
+
     static {
-        Map conf = readStormConfig();
+        Map<String, Object> conf = readStormConfig();
         serializationDelegate = getSerializationDelegate(conf);
     }
 
-    public static Object newInstance(String klass) {
+    @SuppressWarnings("unchecked")
+    public static <T> T newInstance(String klass) {
+        try {
+            return newInstance((Class<T>)Class.forName(klass));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T newInstance(Class<T> klass) {
+        return _instance.newInstanceImpl(klass);
+    }
+
+    // Non-static impl methods exist for mocking purposes.
+    public <T> T newInstanceImpl(Class<T> klass) {
         try {
-            Class c = Class.forName(klass);
-            return c.newInstance();
+            return klass.newInstance();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -145,11 +213,11 @@ public class Utils {
         return serializationDelegate.deserialize(serialized, clazz);
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) {
+    public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length) {
         try {
-            T ret = (T) c.newInstance();
+            T ret = c.newInstance();
             TDeserializer des = getDes();
-            des.deserialize((TBase)ret, b, offset, length);
+            des.deserialize((TBase) ret, b, offset, length);
             return ret;
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -256,7 +324,6 @@ public class Utils {
         return ret.toString();
     }
 
-
     public static long bitXorVals(List<Long> coll) {
         long result = 0;
         for (Long val : coll) {
@@ -286,16 +353,17 @@ public class Utils {
         }
     }
 
-    public static Map findAndReadConfigFile(String name, boolean mustExist) {
+    public static Map<String, Object> findAndReadConfigFile(String name, boolean mustExist) {
         InputStream in = null;
         boolean confFileEmpty = false;
         try {
             in = getConfigFileInputStream(name);
             if (null != in) {
                 Yaml yaml = new Yaml(new SafeConstructor());
-                Map ret = (Map) yaml.load(new InputStreamReader(in));
+                @SuppressWarnings("unchecked")
+                Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(in));
                 if (null != ret) {
-                    return new HashMap(ret);
+                    return new HashMap<>(ret);
                 } else {
                     confFileEmpty = true;
                 }
@@ -307,7 +375,7 @@ public class Utils {
                 else
                     throw new RuntimeException("Could not find config file on classpath " + name);
             } else {
-                return new HashMap();
+                return new HashMap<>();
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -349,16 +417,16 @@ public class Utils {
     }
 
 
-    public static Map findAndReadConfigFile(String name) {
+    public static Map<String, Object> findAndReadConfigFile(String name) {
         return findAndReadConfigFile(name, true);
     }
 
-    public static Map readDefaultConfig() {
+    public static Map<String, Object> readDefaultConfig() {
         return findAndReadConfigFile("defaults.yaml", true);
     }
 
-    public static Map readCommandLineOpts() {
-        Map ret = new HashMap();
+    public static Map<String, Object> readCommandLineOpts() {
+        Map<String, Object> ret = new HashMap<>();
         String commandOptions = System.getProperty("storm.options");
         if (commandOptions != null) {
             /*
@@ -389,10 +457,10 @@ public class Utils {
         return ret;
     }
 
-    public static Map readStormConfig() {
-        Map ret = readDefaultConfig();
+    public static Map<String, Object> readStormConfig() {
+        Map<String, Object> ret = readDefaultConfig();
         String confFile = System.getProperty("storm.conf.file");
-        Map storm;
+        Map<String, Object> storm;
         if (confFile == null || confFile.equals("")) {
             storm = findAndReadConfigFile("storm.yaml", false);
         } else {
@@ -482,7 +550,11 @@ public class Utils {
         HashMap nconf = new HashMap(conf);
         // only enable cleanup of blobstore on nimbus
         nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE);
-        store.prepare(nconf, baseDir, nimbusInfo);
+
+        if(store != null) {
+            // store can be null during testing when mocking utils.
+            store.prepare(nconf, baseDir, nimbusInfo);
+        }
         return store;
     }
 
@@ -497,6 +569,11 @@ public class Utils {
      */
     public static void downloadResourcesAsSupervisor(String key, String localFile,
                                                      ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
+        _instance.downloadResourcesAsSupervisorImpl(key, localFile, cb);
+    }
+
+    public void downloadResourcesAsSupervisorImpl(String key, String localFile,
+            ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
         final int MAX_RETRY_ATTEMPTS = 2;
         final int ATTEMPTS_INTERVAL_TIME = 100;
         for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) {
@@ -515,11 +592,8 @@ public class Utils {
 
     private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) {
         boolean isSuccess = false;
-        FileOutputStream out = null;
-        InputStreamWithMeta in = null;
-        try {
-            out = new FileOutputStream(localFile);
-            in = cb.getBlob(key);
+        try (FileOutputStream out = new FileOutputStream(localFile);
+                InputStreamWithMeta in = cb.getBlob(key);) {
             long fileSize = in.getFileLength();
 
             byte[] buffer = new byte[1024];
@@ -533,17 +607,6 @@ public class Utils {
             isSuccess = (fileSize == downloadFileSize);
         } catch (TException | IOException e) {
             LOG.error("An exception happened while downloading {} from blob store.", localFile, e);
-        } finally {
-            try {
-                if (out != null) {
-                    out.close();
-                }
-            } catch (IOException ignored) {}
-            try {
-                if (in != null) {
-                    in.close();
-                }
-            } catch (IOException ignored) {}
         }
         if (!isSuccess) {
             try {
@@ -555,8 +618,21 @@ public class Utils {
         return isSuccess;
     }
 
+    public static boolean checkFileExists(File path) {
+        return Files.exists(path.toPath());
+    }
+    
+    public static boolean checkFileExists(String path) {
+        return Files.exists(new File(path).toPath());
+    }
+
     public static boolean checkFileExists(String dir, String file) {
-        return Files.exists(new File(dir, file).toPath());
+        return checkFileExists(dir + FILE_PATH_SEPARATOR + file);
+    }
+
+    public static boolean CheckDirExists(String dir) {
+        File file = new File(dir);
+        return file.isDirectory();
     }
 
     public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException {
@@ -621,27 +697,19 @@ public class Utils {
     }
 
 
-    public static synchronized IFn loadClojureFn(String namespace, String name) {
+    public static synchronized clojure.lang.IFn loadClojureFn(String namespace, String name) {
         try {
             clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
         } catch (Exception e) {
             //if playing from the repl and defining functions, file won't exist
         }
-        return (IFn) RT.var(namespace, name).deref();
+        return (clojure.lang.IFn) RT.var(namespace, name).deref();
     }
 
     public static boolean isSystemId(String id) {
         return id.startsWith("__");
     }
 
-    public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
-        Map<V, K> ret = new HashMap<V, K>();
-        for (Map.Entry<K, V> entry : map.entrySet()) {
-            ret.put(entry.getValue(), entry.getKey());
-        }
-        return ret;
-    }
-
     public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
         if (topology.get_spouts().containsKey(id)) {
             return topology.get_spouts().get(id).get_common();
@@ -709,7 +777,7 @@ public class Utils {
         }
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b) {
+    public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
         try {
             return Utils.thriftDeserialize(c, b, 0, b.length);
         } catch (Exception e) {
@@ -869,7 +937,7 @@ public class Utils {
         }
 
         boolean gzipped = inFile.toString().endsWith("gz");
-        if (onWindows()) {
+        if (isOnWindows()) {
             // Tar is not native to Windows. Use simple Java based implementation for
             // tests and simple tar archives
             unTarUsingJava(inFile, untarDir, gzipped);
@@ -911,7 +979,6 @@ public class Utils {
     private static void unTarUsingJava(File inFile, File untarDir,
                                        boolean gzipped) throws IOException {
         InputStream inputStream = null;
-        TarArchiveInputStream tis = null;
         try {
             if (gzipped) {
                 inputStream = new BufferedInputStream(new GZIPInputStream(
@@ -919,31 +986,15 @@ public class Utils {
             } else {
                 inputStream = new BufferedInputStream(new FileInputStream(inFile));
             }
-            tis = new TarArchiveInputStream(inputStream);
-            for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
-                unpackEntries(tis, entry, untarDir);
-                entry = tis.getNextTarEntry();
+            try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
+                for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+                    unpackEntries(tis, entry, untarDir);
+                    entry = tis.getNextTarEntry();
+                }
             }
         } finally {
-            cleanup(tis, inputStream);
-        }
-    }
-
-    /**
-     * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
-     * null pointers. Must only be used for cleanup in exception handlers.
-     *
-     * @param closeables the objects to close
-     */
-    private static void cleanup(java.io.Closeable... closeables) {
-        for (java.io.Closeable c : closeables) {
-            if (c != null) {
-                try {
-                    c.close();
-                } catch (IOException e) {
-                    LOG.debug("Exception in closing " + c, e);
-
-                }
+            if(inputStream != null) {
+                inputStream.close();
             }
         }
     }
@@ -965,7 +1016,7 @@ public class Utils {
         if (!outputFile.getParentFile().exists()) {
             if (!outputFile.getParentFile().mkdirs()) {
                 throw new IOException("Mkdirs failed to create tar internal dir "
-                        + outputDir);
+                                      + outputDir);
             }
         }
         int count;
@@ -980,13 +1031,17 @@ public class Utils {
         outputStream.close();
     }
 
-    public static boolean onWindows() {
+    public static boolean isOnWindows() {
         if (System.getenv("OS") != null) {
             return System.getenv("OS").equals("Windows_NT");
         }
         return false;
     }
 
+    public static boolean isAbsolutePath(String path) {
+        return Paths.get(path).isAbsolute();
+    }
+
     public static void unpack(File localrsrc, File dst) throws IOException {
         String lowerDst = localrsrc.getName().toLowerCase();
         if (lowerDst.endsWith(".jar")) {
@@ -1026,6 +1081,10 @@ public class Utils {
         return newCurator(conf, servers, port, root, null);
     }
 
+    public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
+        return newCurator(conf, servers, port, "", auth);
+    }
+
     public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
         List<String> serverPorts = new ArrayList<String>();
         for (String zkServer : servers) {
@@ -1066,17 +1125,19 @@ public class Utils {
             .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
             .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
             .retryPolicy(new StormBoundedExponentialBackoffRetry(
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
 
         if (auth != null && auth.scheme != null && auth.payload != null) {
             builder.authorization(auth.scheme, auth.payload);
         }
     }
 
-    public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
-        return newCurator(conf, servers, port, "", auth);
+    public static void testSetupBuilder(CuratorFrameworkFactory.Builder
+                                                builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
+    {
+        setupBuilder(builder, zkStr, conf, auth);
     }
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
@@ -1117,10 +1178,16 @@ public class Utils {
                 LOG.info("{}:{}", prefix, line);
             }
         } catch (IOException e) {
-            LOG.warn("Error whiel trying to log stream", e);
+            LOG.warn("Error while trying to log stream", e);
         }
     }
 
+    /**
+     * Checks if a throwable is an instance of a particular class
+     * @param klass The class you're expecting
+     * @param throwable The throwable you expect to be an instance of klass
+     * @return true if throwable is instance of klass, false otherwise.
+     */
     public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
         Throwable t = throwable;
         while (t != null) {
@@ -1156,6 +1223,7 @@ public class Utils {
                 && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
     }
 
+
     public static List<ACL> getWorkerACL(Map conf) {
         //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
         if (!isZkAuthenticationConfiguredTopology(conf)) {
@@ -1163,11 +1231,11 @@ public class Utils {
         }
         String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
         if (stormZKUser == null) {
-            throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+            throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
         }
-        String[] split = stormZKUser.split(":",2);
+        String[] split = stormZKUser.split(":", 2);
         if (split.length != 2) {
-            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
         }
         ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
         ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
@@ -1206,6 +1274,10 @@ public class Utils {
         }
     }
 
+    /**
+     * Gets some information, including stack trace, for a running thread.
+     * @return A human-readable string of the dump.
+     */
     public static String threadDump() {
         final StringBuilder dump = new StringBuilder();
         final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();
@@ -1231,20 +1303,19 @@ public class Utils {
         return dump.toString();
     }
 
-    // Assumes caller is synchronizing
+    /**
+     * Creates an instance of the pluggable SerializationDelegate or falls back to
+     * DefaultSerializationDelegate if something goes wrong.
+     * @param stormConf The config from which to pull the name of the pluggable class.
+     * @return an instance of the class specified by storm.meta.serialization.delegate
+     */
     private static SerializationDelegate getSerializationDelegate(Map stormConf) {
         String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
         SerializationDelegate delegate;
         try {
             Class delegateClass = Class.forName(delegateClassName);
             delegate = (SerializationDelegate) delegateClass.newInstance();
-        } catch (ClassNotFoundException e) {
-            LOG.error("Failed to construct serialization delegate, falling back to default", e);
-            delegate = new DefaultSerializationDelegate();
-        } catch (InstantiationException e) {
-            LOG.error("Failed to construct serialization delegate, falling back to default", e);
-            delegate = new DefaultSerializationDelegate();
-        } catch (IllegalAccessException e) {
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
             LOG.error("Failed to construct serialization delegate, falling back to default", e);
             delegate = new DefaultSerializationDelegate();
         }
@@ -1290,7 +1361,7 @@ public class Utils {
                         if (!file.getParentFile().mkdirs()) {
                             if (!file.getParentFile().isDirectory()) {
                                 throw new IOException("Mkdirs failed to create " +
-                                        file.getParentFile().toString());
+                                                      file.getParentFile().toString());
                             }
                         }
                         OutputStream out = new FileOutputStream(file);
@@ -1353,13 +1424,16 @@ public class Utils {
             }
             if (memoryOpts != null) {
                 int unit = 1;
-                if (memoryOpts.toLowerCase().endsWith("k")) {
+                memoryOpts = memoryOpts.toLowerCase();
+
+                if (memoryOpts.endsWith("k")) {
                     unit = 1024;
-                } else if (memoryOpts.toLowerCase().endsWith("m")) {
+                } else if (memoryOpts.endsWith("m")) {
                     unit = 1024 * 1024;
-                } else if (memoryOpts.toLowerCase().endsWith("g")) {
+                } else if (memoryOpts.endsWith("g")) {
                     unit = 1024 * 1024 * 1024;
                 }
+
                 memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", "");
                 Double result =  Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0;
                 return (result < 1.0) ? 1.0 : result;
@@ -1382,18 +1456,29 @@ public class Utils {
     }
 
     public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
-        TopologyInfo topologyInfo = null;
         try (NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser)) {
-            ClusterSummary summary = client.getClient().getClusterInfo();
+            String topologyId = getTopologyId(name, client.getClient());
+            if (null != topologyId) {
+                return client.getClient().getTopologyInfo(topologyId);
+            }
+            return null;
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static String getTopologyId(String name, Nimbus.Client client) {
+        try {
+            ClusterSummary summary = client.getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {
                 if(s.get_name().equals(name)) {
-                    topologyInfo = client.getClient().getTopologyInfo(s.get_id());
+                    return s.get_id();
                 }
             }
         } catch(Exception e) {
             throw new RuntimeException(e);
         }
-        return topologyInfo;
+        return null;
     }
 
     /**
@@ -1408,5 +1493,745 @@ public class Utils {
     public static int toPositive(int number) {
         return number & Integer.MAX_VALUE;
     }
-}
 
+    public static GlobalStreamId getGlobalStreamId(String streamId, String componentId) {
+        if (componentId == null) {
+            return new GlobalStreamId(streamId, DEFAULT_STREAM_ID);
+        }
+        return new GlobalStreamId(streamId, componentId);
+    }
+
+    public static RuntimeException wrapInRuntime(Exception e){
+        if (e instanceof RuntimeException){
+            return (RuntimeException)e;
+        } else {
+            return new RuntimeException(e);
+        }
+    }
+
+    public static int getAvailablePort(int prefferedPort) {
+        int localPort = -1;
+        try(ServerSocket socket = new ServerSocket(prefferedPort)) {
+            localPort = socket.getLocalPort();
+        } catch(IOException exp) {
+            if (prefferedPort > 0) {
+                return getAvailablePort(0);
+            }
+        }
+        return localPort;
+    }
+
+    public static int getAvailablePort() {
+        return getAvailablePort(0);
+    }
+
+    /**
+     * Determines if a zip archive contains a particular directory.
+     *
+     * @param zipfile path to the zipped file
+     * @param target directory being looked for in the zip.
+     * @return boolean whether or not the directory exists in the zip.
+     */
+    public static boolean zipDoesContainDir(String zipfile, String target) throws IOException {
+        List<ZipEntry> entries = (List<ZipEntry>) Collections.list(new ZipFile(zipfile).entries());
+
+        String targetDir = target + "/";
+        for(ZipEntry entry : entries) {
+            String name = entry.getName();
+            if(name.startsWith(targetDir)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Joins any number of maps together into a single map, combining their values into
+     * a list, maintaining values in the order the maps were passed in. Nulls are inserted
+     * for given keys when the map does not contain that key.
+     *
+     * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 5}) ->
+     *      {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 5]}
+     *
+     * @param maps variable number of maps to join - order affects order of values in output.
+     * @return combined map
+     */
+    public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+        Map<K, List<V>> ret = new HashMap<>();
+
+        Set<K> keys = new HashSet<>();
+
+        for(Map<K, V> map : maps) {
+            keys.addAll(map.keySet());
+        }
+
+        for(Map<K, V> m : maps) {
+            for(K key : keys) {
+                V value = m.get(key);
+
+                if(!ret.containsKey(key)) {
+                    ret.put(key, new ArrayList<V>());
+                }
+
+                List<V> targetList = ret.get(key);
+                targetList.add(value);
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Fills up chunks out of a collection (given a maximum amount of chunks)
+     *
+     * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+     *      partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+     *      partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+     * @param maxNumChunks the maximum number of chunks to return
+     * @param coll the collection to be chunked up
+     * @return a list of the chunks, which are themselves lists.
+     */
+    public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll) {
+        List<List<T>> ret = new ArrayList<>();
+
+        if(maxNumChunks == 0 || coll == null) {
+            return ret;
+        }
+
+        Map<Integer, Integer> parts = integerDivided(coll.size(), maxNumChunks);
+
+        // Keys sorted in descending order
+        List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+        Collections.sort(sortedKeys, Collections.reverseOrder());
+
+
+        Iterator<T> it = coll.iterator();
+        for(Integer chunkSize : sortedKeys) {
+            if(!it.hasNext()) { break; }
+            Integer times = parts.get(chunkSize);
+            for(int i = 0; i < times; i++) {
+                if(!it.hasNext()) { break; }
+                List<T> chunkList = new ArrayList<>();
+                for(int j = 0; j < chunkSize; j++) {
+                    if(!it.hasNext()) { break; }
+                    chunkList.add(it.next());
+                }
+                ret.add(chunkList);
+            }
+        }
+
+        return ret;
+    }
+
+    /**
+     * Return a new instance of a pluggable specified in the conf.
+     * @param conf The conf to read from.
+     * @param configKey The key pointing to the pluggable class
+     * @return an instance of the class or null if it is not specified.
+     */
+    public static Object getConfiguredClass(Map conf, Object configKey) {
+        if (conf.containsKey(configKey)) {
+            return newInstance((String)conf.get(configKey));
+        }
+        return null;
+    }
+
+    public static String logsFilename(String stormId, String port) {
+        return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "worker.log";
+    }
+
+    public static String eventLogsFilename(String stormId, String port) {
+        return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "events.log";
+    }
+
+    public static Object readYamlFile(String yamlFile) {
+        try (FileReader reader = new FileReader(yamlFile)) {
+            return new Yaml(new SafeConstructor()).load(reader);
+        } catch(Exception ex) {
+            LOG.error("Failed to read yaml file.", ex);
+        }
+        return null;
+    }
+
+    public static void setupDefaultUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                public void uncaughtException(Thread thread, Throwable thrown) {
+                    try {
+                        handleUncaughtException(thrown);
+                    } catch (Error err) {
+                        LOG.error("Received error in main thread.. terminating server...", err);
+                        Runtime.getRuntime().exit(-2);
+                    }
+                }
+            });
+    }
+
+    /**
+     * Creates a new map with a string value in the map replaced with an
+     * equivalently-lengthed string of '#'.
+     * @param m The map that a value will be redacted from
+     * @param key The key pointing to the value to be redacted
+     * @return a new map with the value redacted. The original map will not be modified.
+     */
+    public static Map<Object, String> redactValue(Map<Object, String> m, Object key) {
+        if(m.containsKey(key)) {
+            HashMap<Object, String> newMap = new HashMap<>(m);
+            String value = newMap.get(key);
+            String redacted = new String(new char[value.length()]).replace("\0", "#");
+            newMap.put(key, redacted);
+            return newMap;
+        }
+        return m;
+    }
+
+    /**
+     * Make sure a given key name is valid for the storm config.
+     * Throw RuntimeException if the key isn't valid.
+     * @param name The name of the config key to check.
+     */
+    private static final Set<String> disallowedKeys = new HashSet<>(Arrays.asList(new String[] {"/", ".", ":", "\\"}));
+    public static void validateKeyName(String name) {
+
+        for(String key : disallowedKeys) {
+            if( name.contains(key) ) {
+                throw new RuntimeException("Key name cannot contain any of the following: " + disallowedKeys.toString());
+            }
+        }
+        if(name.trim().isEmpty()) {
+            throw new RuntimeException("Key name cannot be blank");
+        }
+    }
+
+    public static String localHostname () throws UnknownHostException {
+        return _instance.localHostnameImpl();
+    }
+
+    // Non-static impl methods exist for mocking purposes.
+    protected String localHostnameImpl () throws UnknownHostException {
+        return InetAddress.getLocalHost().getCanonicalHostName();
+    }
+
+    private static String memoizedLocalHostnameString = null;
+
+    public static String memoizedLocalHostname () throws UnknownHostException {
+        if (memoizedLocalHostnameString == null) {
+            memoizedLocalHostnameString = localHostname();
+        }
+        return memoizedLocalHostnameString;
+    }
+
+    /**
+     * Gets the storm.local.hostname value, or tries to figure out the local hostname
+     * if it is not set in the config.
+     * @param conf The storm config to read from
+     * @return a string representation of the hostname.
+    */
+    public static String hostname (Map<String, Object> conf) throws UnknownHostException  {
+        if (conf == null) {
+            return memoizedLocalHostname();
+        }
+        Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+        if (hostnameString == null || hostnameString.equals("")) {
+            return memoizedLocalHostname();
+        }
+        return (String)hostnameString;
+    }
+
+    public static String uuid() {
+        return UUID.randomUUID().toString();
+    }
+
+    public static void exitProcess (int val, String msg) {
+        String combinedErrorMessage = "Halting process: " + msg;
+        LOG.error(combinedErrorMessage, new RuntimeException(combinedErrorMessage));
+        Runtime.getRuntime().exit(val);
+    }
+
+    public static Runnable mkSuicideFn() {
+        return new Runnable() {
+            @Override
+            public void run() {
+                Utils.exitProcess(1, "Worker died");
+            }
+        };
+    }
+
+    /**
+     * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+     *
+     * Example usage in java:
+     *  Map<Integer, String> tasks;
+     *  Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
+     *
+     * The order of he resulting list values depends on the ordering properties
+     * of the Map passed in. The caller is responsible for passing an ordered
+     * map if they expect the result to be consistently ordered as well.
+     *
+     * @param map to reverse
+     * @return a reversed map
+     */
+    public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+        HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+        if (map == null) {
+            return rtn;
+        }
+        for (Entry<K, V> entry : map.entrySet()) {
+            K key = entry.getKey();
+            V val = entry.getValue();
+            List<K> list = rtn.get(val);
+            if (list == null) {
+                list = new ArrayList<K>();
+                rtn.put(entry.getValue(), list);
+            }
+            list.add(key);
+        }
+        return rtn;
+    }
+
+    /**
+     * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
+     * Reverses an assoc-list style Map like reverseMap(Map...)
+     *
+     * @param listSeq to reverse
+     * @return a reversed map
+     */
+    public static HashMap reverseMap(List listSeq) {
+        HashMap<Object, List<Object>> rtn = new HashMap();
+        if (listSeq == null) {
+            return rtn;
+        }
+        for (Object entry : listSeq) {
+            List listEntry = (List) entry;
+            Object key = listEntry.get(0);
+            Object val = listEntry.get(1);
+            List list = rtn.get(val);
+            if (list == null) {
+                list = new ArrayList<Object>();
+                rtn.put(val, list);
+            }
+            list.add(key);
+        }
+        return rtn;
+    }
+
+
+    /**
+     * @return the pid of this JVM, because Java doesn't provide a real way to do this.
+     */
+    public static String processPid() {
+        String name = ManagementFactory.getRuntimeMXBean().getName();
+        String[] split = name.split("@");
+        if (split.length != 2) {
+            throw new RuntimeException("Got unexpected process name: " + name);
+        }
+        return split[0];
+    }
+
+    public static int execCommand(String... command) throws ExecuteException, IOException {
+        CommandLine cmd = new CommandLine(command[0]);
+        for (int i = 1; i < command.length; i++) {
+            cmd.addArgument(command[i]);
+        }
+
+        DefaultExecutor exec = new DefaultExecutor();
+        return exec.execute(cmd);
+    }
+
+    /**
+     * Extract dir from the jar to destdir
+     *
+     * @param jarpath Path to the jar file
+     * @param dir Directory in the jar to pull out
+     * @param destdir Path to the directory where the extracted directory will be put
+     */
+    public static void extractDirFromJar(String jarpath, String dir, File destdir) {
+        _instance.extractDirFromJarImpl(jarpath, dir, destdir);
+    }
+    
+    public void extractDirFromJarImpl(String jarpath, String dir, File destdir) {
+        try (JarFile jarFile = new JarFile(jarpath)) {
+            Enumeration<JarEntry> jarEnums = jarFile.entries();
+            while (jarEnums.hasMoreElements()) {
+                JarEntry entry = jarEnums.nextElement();
+                if (!entry.isDirectory() && entry.getName().startsWith(dir)) {
+                    File aFile = new File(destdir, entry.getName());
+                    aFile.getParentFile().mkdirs();
+                    try (FileOutputStream out = new FileOutputStream(aFile);
+                         InputStream in = jarFile.getInputStream(entry)) {
+                        IOUtils.copy(in, out);
+                    }
+                }
+            }
+        } catch (IOException e) {
+            LOG.info("Could not extract {} from {}", dir, jarpath);
+        }
+    }
+
+    public static void sendSignalToProcess(long lpid, int signum) throws IOException {
+        String pid = Long.toString(lpid);
+        try {
+            if (isOnWindows()) {
+                if (signum == SIGKILL) {
+                    execCommand("taskkill", "/f", "/pid", pid);
+                } else {
+                    execCommand("taskkill", "/pid", pid);
+                }
+            } else {
+                execCommand("kill", "-" + signum, pid);
+            }
+        } catch (ExecuteException e) {
+            LOG.info("Error when trying to kill {}. Process is probably already dead.", pid);
+        } catch (IOException e) {
+            LOG.info("IOException Error when trying to kill {}.", pid);
+            throw e;
+        }
+    }
+
+    public static void forceKillProcess (String pid) throws IOException {
+        sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+    }
+
+    public static void killProcessWithSigTerm (String pid) throws IOException {
+        sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+    }
+
+    /**
+     * Adds the user supplied function as a shutdown hook for cleanup.
+     * Also adds a function that sleeps for a second and then halts the
+     * runtime to avoid any zombie process in case cleanup function hangs.
+     */
+    public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
+        Runnable sleepKill = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Time.sleepSecs(1);
+                    Runtime.getRuntime().halt(20);
+                } catch (Exception e) {
+                    LOG.warn("Exception in the ShutDownHook", e);
+                }
+            }
+        };
+        Runtime.getRuntime().addShutdownHook(new Thread(func));
+        Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+    }
+
+    /**
+     * Returns the combined string, escaped for posix shell.
+     * @param command the list of strings to be combined
+     * @return the resulting command string
+     */
+    public static String shellCmd (List<String> command) {
+        List<String> changedCommands = new ArrayList<>(command.size());
+        for (String str: command) {
+            if (str == null) {
+                continue;
+            }
+            changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") + "'");
+        }
+        return StringUtils.join(changedCommands, " ");
+    }
+
+    public static String scriptFilePath (String dir) {
+        return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
+    }
+
+    public static String containerFilePath (String dir) {
+        return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
+    }
+
+    public static Object nullToZero (Object v) {
+        return (v != null ? v : 0);
+    }
+
+    /**
+     * Deletes a file or directory and its contents if it exists. Does not
+     * complain if the input is null or does not exist.
+     * @param path the path to the file or directory
+     */
+    public static void forceDelete(String path) throws IOException {
+        _instance.forceDeleteImpl(path);
+    }
+
+    // Non-static impl methods exist for mocking purposes.
+    protected void forceDeleteImpl(String path) throws IOException {
+        LOG.debug("Deleting path {}", path);
+        if (checkFileExists(path)) {
+            try {
+                FileUtils.forceDelete(new File(path));
+            } catch (FileNotFoundException ignored) {}
+        }
+    }
+
+    /**
+     * Returns a Collection of file names found under the given directory.
+     * @param dir a directory
+     * @return the Collection of file names
+     */
+    public static Collection<String> readDirContents(String dir) {
+        Collection<String> ret = new HashSet<>();
+        File[] files = new File(dir).listFiles();
+        if (files != null) {
+            for (File f: files) {
+                ret.add(f.getName());
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Returns the value of java.class.path System property. Kept separate for
+     * testing.
+     * @return the classpath
+     */
+    public static String currentClasspath() {
+        return _instance.currentClasspathImpl();
+    }
+
+    // Non-static impl methods exist for mocking purposes.
+    public String currentClasspathImpl() {
+        return System.getProperty("java.class.path");
+    }
+
+    public static String addToClasspath(String classpath,
+                Collection<String> paths) {
+        return _instance.addToClasspathImpl(classpath, paths);
+    }
+
+    public static String addToClasspath(Collection<String> classpaths,
+                Collection<String> paths) {
+        return _instance.addToClasspathImpl(classpaths, paths);
+    }
+
+    // Non-static impl methods exist for mocking purposes.
+    public String addToClasspathImpl(String classpath,
+                Collection<String> paths) {
+        if (paths == null || paths.isEmpty()) {
+            return classpath;
+        }
+        List<String> l = new LinkedList<>();
+        l.add(classpath);
+        l.addAll(paths);
+        return StringUtils.join(l, CLASS_PATH_SEPARATOR);
+    }
+
+    public String addToClasspathImpl(Collection<String> classpaths,
+                Collection<String> paths) {
+        List<String> allPaths = new ArrayList<>();
+        if(classpaths != null) {
+            allPaths.addAll(classpaths);
+        }
+        if(paths != null) {
+            allPaths.addAll(paths);
+        }
+        return StringUtils.join(allPaths, CLASS_PATH_SEPARATOR);
+    }
+
+    public static class UptimeComputer {
+        int startTime = 0;
+
+        public UptimeComputer() {
+            startTime = Time.currentTimeSecs();
+        }
+
+        public int upTime() {
+            return Time.deltaSecs(startTime);
+        }
+    }
+
+    public static UptimeComputer makeUptimeComputer() {
+        return _instance.makeUptimeComputerImpl();
+    }
+
+    // Non-static impl methods exist for mocking purposes.
+    public UptimeComputer makeUptimeComputerImpl() {
+        return new UptimeComputer();
+    }
+
+    /**
+     * Writes a posix shell script file to be executed in its own process.
+     * @param dir the directory under which the script is to be written
+     * @param command the command the script is to execute
+     * @param environment optional environment variables to set before running the script's command. May be  null.
+     * @return the path to the script that has been written
+     */
+    public static String writeScript(String dir, List<String> command,
+                                     Map<String,String> environment) throws IOException {
+        String path = Utils.scriptFilePath(dir);
+        try(BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
+            out.write("#!/bin/bash");
+            out.newLine();
+            if (environment != null) {
+                for (String k : environment.keySet()) {
+                    String v = environment.get(k);
+                    if (v == null) {
+                        v = "";
+                    }
+                    out.write(Utils.shellCmd(
+                            Arrays.asList(
+                                    "export",k+"="+v)));
+                    out.write(";");
+                    out.newLine();
+                }
+            }
+            out.newLine();
+            out.write("exec "+Utils.shellCmd(command)+";");
+        }
+        return path;
+    }
+
+    /**
+     * A thread that can answer if it is sleeping in the case of simulated time.
+     * This class is not useful when simulated time is not being used.
+     */
+    public static class SmartThread extends Thread {
+        public boolean isSleeping() {
+            return Time.isThreadWaiting(this);
+        }
+        public SmartThread(Runnable r) {
+            super(r);
+        }
+    }
+
+    /**
+     * Creates a thread that calls the given code repeatedly, sleeping for an
+     * interval of seconds equal to the return value of the previous call.
+     *
+     * The given afn may be a callable that returns the number of seconds to
+     * sleep, or it may be a Callable that returns another Callable that in turn
+     * returns the number of seconds to sleep. In the latter case isFactory.
+     *
+     * @param afn the code to call on each iteration
+     * @param isDaemon whether the new thread should be a daemon thread
+     * @param eh code to call when afn throws an exception
+     * @param priority the new thread's priority
+     * @param isFactory whether afn returns a callable instead of sleep seconds
+     * @param startImmediately whether to start the thread before returning
+     * @param threadName a suffix to be appended to the thread name
+     * @return the newly created thread
+     * @see java.lang.Thread
+     */
+    public static SmartThread asyncLoop(final Callable afn,
+            boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+            int priority, final boolean isFactory, boolean startImmediately,
+            String threadName) {
+        SmartThread thread = new SmartThread(new Runnable() {
+            public void run() {
+                Object s;
+                try {
+                    Callable fn = isFactory ? (Callable) afn.call() : afn;
+                    while ((s = fn.call()) instanceof Long) {
+                        Time.sleepSecs((Long) s);
+                    }
+                } catch (Throwable t) {
+                    if (Utils.exceptionCauseIsInstanceOf(
+                            InterruptedException.class, t)) {
+                        LOG.info("Async loop interrupted!");
+                        return;
+                    }
+                    LOG.error("Async loop died!", t);
+                    throw new RuntimeException(t);
+                }
+            }
+        });
+        if (eh != null) {
+            thread.setUncaughtExceptionHandler(eh);
+        } else {
+            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                public void uncaughtException(Thread t, Throwable e) {
+                    LOG.error("Async loop died!", e);
+                    Utils.exitProcess(1, "Async loop died!");
+                }
+            });
+        }
+        thread.setDaemon(isDaemon);
+        thread.setPriority(priority);
+        if (threadName != null && !threadName.isEmpty()) {
+            thread.setName(thread.getName() +"-"+ threadName);
+        }
+        if (startImmediately) {
+            thread.start();
+        }
+        return thread;
+    }
+
+    /**
+     * Convenience method used when only the function and name suffix are given.
+     * @param afn the code to call on each iteration
+     * @param threadName a suffix to be appended to the thread name
+     * @return the newly created thread
+     * @see java.lang.Thread
+     */
+    public static SmartThread asyncLoop(final Callable afn, String threadName, final Thread.UncaughtExceptionHandler eh) {
+        return asyncLoop(afn, false, eh, Thread.NORM_PRIORITY, false, true,
+                threadName);
+    }
+
+    /**
+     * Convenience method used when only the function is given.
+     * @param afn the code to call on each iteration
+     * @return the newly created thread
+     */
+    public static SmartThread asyncLoop(final Callable afn) {
+        return asyncLoop(afn, false, null, Thread.NORM_PRIORITY, false, true,
+                null);
+    }
+
+    public static <T> List<T> interleaveAll(List<List<T>> nodeList) {
+        if (nodeList != null && nodeList.size() > 0) {
+            List<T> first = new ArrayList<T>();
+            List<List<T>> rest = new ArrayList<List<T>>();
+            for (List<T> node : nodeList) {
+                if (node != null && node.size() > 0) {
+                  first.add(node.get(0));
+                  rest.add(node.subList(1, node.size()));
+                }
+            }
+            List<T> interleaveRest = interleaveAll(rest);
+            if (interleaveRest != null) {
+                first.addAll(interleaveRest);
+            }
+            return first;
+        }
+        return null;
+      }
+
+    public static long bitXor(Long a, Long b) {
+        return a ^ b;
+    }
+
+    public static List<String> getRepeat(List<String> list) {
+        List<String> rtn = new ArrayList<String>();
+        Set<String> idSet = new HashSet<String>();
+
+        for (String id : list) {
+            if (idSet.contains(id)) {
+                rtn.add(id);
+            } else {
+                idSet.add(id);
+            }
+        }
+
+        return rtn;
+    }
+
+    /**
+     * converts a clojure PersistentMap to java HashMap
+     */
+    public static Map<String, Object> convertClojureMapToJavaMap(Map map) {
+        Map<String, Object> ret = new HashMap<>(map.size());
+        for (Object obj : map.entrySet()) {
+            Map.Entry entry = (Map.Entry) obj;
+            Keyword keyword = (Keyword) entry.getKey();
+            String key = keyword.getName();
+            if (key.startsWith(":")) {
+                key = key.substring(1, key.length());
+            }
+            Object value = entry.getValue();
+            ret.put(key, value);
+        }
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
new file mode 100644
index 0000000..74816c2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LeaderElectorImp implements ILeaderElector {
+    private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
+    private final Map conf;
+    private final List<String> servers;
+    private final CuratorFramework zk;
+    private final String leaderlockPath;
+    private final String id;
+    private final AtomicReference<LeaderLatch> leaderLatch;
+    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+    private final BlobStore blobStore;
+
+    public LeaderElectorImp(Map conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference<LeaderLatch> leaderLatch,
+            AtomicReference<LeaderLatchListener> leaderLatchListener, BlobStore blobStore) {
+        this.conf = conf;
+        this.servers = servers;
+        this.zk = zk;
+        this.leaderlockPath = leaderlockPath;
+        this.id = id;
+        this.leaderLatch = leaderLatch;
+        this.leaderLatchListener = leaderLatchListener;
+        this.blobStore = blobStore;
+    }
+
+    @Override
+    public void prepare(Map conf) {
+        // no-op for zookeeper implementation
+    }
+
+    @Override
+    public void addToLeaderLockQueue() throws Exception {
+        // if this latch is already closed, we need to create new instance.
+        if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
+            leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
+            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get()));
+            LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
+        }
+        // Only if the latch is not already started we invoke start
+        if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
+            leaderLatch.get().addListener(leaderLatchListener.get());
+            leaderLatch.get().start();
+            LOG.info("Queued up for leader lock.");
+        } else {
+            LOG.info("Node already in queue for leader lock.");
+        }
+    }
+
+    @Override
+    // Only started latches can be closed.
+    public void removeFromLeaderLockQueue() throws Exception {
+        if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
+            leaderLatch.get().close();
+            LOG.info("Removed from leader lock queue.");
+        } else {
+            LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");
+        }
+    }
+
+    @Override
+    public boolean isLeader() throws Exception {
+        return leaderLatch.get().hasLeadership();
+    }
+
+    @Override
+    public NimbusInfo getLeader() {
+        try {
+            return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    @Override
+    public List<NimbusInfo> getAllNimbuses() throws Exception {
+        List<NimbusInfo> nimbusInfos = new ArrayList<>();
+        Collection<Participant> participants = leaderLatch.get().getParticipants();
+        for (Participant participant : participants) {
+            nimbusInfos.add(Zookeeper.toNimbusInfo(participant));
+        }
+        return nimbusInfos;
+    }
+
+    @Override
+    public void close() {
+        LOG.info("closing zookeeper connection of leader elector.");
+        zk.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java
new file mode 100644
index 0000000..5247558
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.zookeeper;
+
+import org.apache.zookeeper.Watcher;
+
+import java.util.HashMap;
+
+public class ZkEventTypes {
+
+    private static HashMap<Watcher.Event.EventType, String> map;
+
+    static {
+        map = new HashMap<Watcher.Event.EventType, String>();
+
+        map.put(Watcher.Event.EventType.None, ":none");
+        map.put(Watcher.Event.EventType.NodeCreated, ":node-created");
+        map.put(Watcher.Event.EventType.NodeDeleted, ":node-deleted");
+        map.put(Watcher.Event.EventType.NodeDataChanged, ":node-data-changed");
+        map.put(Watcher.Event.EventType.NodeChildrenChanged, ":node-children-changed");
+
+    }
+
+    public static String getTypeName(Watcher.Event.EventType type) {
+        return map.get(type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java
new file mode 100644
index 0000000..66dc231
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.zookeeper;
+
+import org.apache.zookeeper.Watcher;
+
+import java.util.HashMap;
+
+public class ZkKeeperStates {
+
+    private static HashMap<Watcher.Event.KeeperState, String> map;
+
+    static {
+        map = new HashMap<Watcher.Event.KeeperState, String>();
+
+        map.put(Watcher.Event.KeeperState.AuthFailed, ":auth-failed");
+        map.put(Watcher.Event.KeeperState.SyncConnected, ":connected");
+        map.put(Watcher.Event.KeeperState.Disconnected, ":disconnected");
+        map.put(Watcher.Event.KeeperState.Expired, ":expired");
+    }
+
+    public static String getStateName(Watcher.Event.KeeperState state) {
+        return map.get(state);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
new file mode 100644
index 0000000..0580f41
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.zookeeper;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.KeyFilter;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ZookeeperAuthInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class Zookeeper {
+    private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
+
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static final Zookeeper INSTANCE = new Zookeeper();
+    private static Zookeeper _instance = INSTANCE;
+
+    /**
+     * Provide an instance of this class for delegates to use.  To mock out
+     * delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     *
+     * @param u a Zookeeper instance
+     */
+    public static void setInstance(Zookeeper u) {
+        _instance = u;
+    }
+
+    /**
+     * Resets the singleton instance to the default. This is helpful to reset
+     * the class to its original functionality when mocking is no longer
+     * desired.
+     */
+    public static void resetInstance() {
+        _instance = INSTANCE;
+    }
+
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root) {
+        return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack());
+    }
+
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf) {
+        return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
+    }
+
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf) {
+        return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
+    }
+
+    public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+        return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf);
+    }
+
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+        CuratorFramework fk;
+        if (authConf != null) {
+            fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf));
+        } else {
+            fk = Utils.newCurator(conf, servers, port, root);
+        }
+
+        fk.getCuratorListenable().addListener(new CuratorListener() {
+            @Override
+            public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception {
+                if (e.getType().equals(CuratorEventType.WATCHED)) {
+                    WatchedEvent event = e.getWatchedEvent();
+                    watcher.execute(event.getState(), event.getType(), event.getPath());
+                }
+            }
+        });
+        fk.start();
+        return fk;
+    }
+
+    /**
+     * connect ZK, register Watch/unhandle Watch
+     *
+     * @return
+     */
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
+        return mkClientImpl(conf, servers, port, root, watcher, null);
+    }
+
+    public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) {
+        String ret = null;
+        try {
+            String npath = normalizePath(path);
+            ret = zk.create().creatingParentsIfNeeded().withMode(mode).withACL(acls).forPath(npath, data);
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        return ret;
+    }
+
+    public static String createNode(CuratorFramework zk, String path, byte[] data, List<ACL> acls){
+        return createNode(zk, path, data, org.apache.zookeeper.CreateMode.PERSISTENT, acls);
+    }
+
+    public static boolean existsNode(CuratorFramework zk, String path, boolean watch){
+        Stat stat = null;
+        try {
+            if (watch) {
+                stat = zk.checkExists().watched().forPath(normalizePath(path));
+            } else {
+                stat = zk.checkExists().forPath(normalizePath(path));
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        return stat != null;
+    }
+
+    public static void deleteNode(CuratorFramework zk, String path){
+        try {
+            String npath = normalizePath(path);
+            if (existsNode(zk, npath, false)) {
+                zk.delete().deletingChildrenIfNeeded().forPath(normalizePath(path));
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
+                // do nothing
+                LOG.info("delete {} failed.", path, e);
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    public static void mkdirs(CuratorFramework zk, String path, List<ACL> acls) {
+        _instance.mkdirsImpl(zk, path, acls);
+    }
+
+    public void mkdirsImpl(CuratorFramework zk, String path, List<ACL> acls) {
+        String npath = normalizePath(path);
+        if (npath.equals("/")) {
+            return;
+        }
+        if (existsNode(zk, npath, false)) {
+            return;
+        }
+        byte[] byteArray = new byte[1];
+        byteArray[0] = (byte) 7;
+        try {
+            createNode(zk, npath, byteArray, org.apache.zookeeper.CreateMode.PERSISTENT, acls);
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
+                // this can happen when multiple clients doing mkdir at same time
+            }
+        }
+    }
+
+    public static void syncPath(CuratorFramework zk, String path){
+        try {
+            zk.sync().forPath(normalizePath(path));
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public static void addListener(CuratorFramework zk, ConnectionStateListener listener) {
+        zk.getConnectionStateListenable().addListener(listener);
+    }
+
+    public static byte[] getData(CuratorFramework zk, String path, boolean watch){
+        try {
+            String npath = normalizePath(path);
+            if (existsNode(zk, npath, watch)) {
+                if (watch) {
+                    return zk.getData().watched().forPath(npath);
+                } else {
+                    return zk.getData().forPath(npath);
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+                // this is fine b/c we still have a watch from the successful exists call
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+        return null;
+    }
+
+    public static Integer getVersion(CuratorFramework zk, String path, boolean watch) throws Exception {
+        String npath = normalizePath(path);
+        Stat stat = null;
+        if (existsNode(zk, npath, watch)) {
+            if (watch) {
+                stat = zk.checkExists().watched().forPath(npath);
+            } else {
+                stat = zk.checkExists().forPath(npath);
+            }
+        }
+        return stat == null ? null : Integer.valueOf(stat.getVersion());
+    }
+
+    public static List<String> getChildren(CuratorFramework zk, String path, boolean watch) {
+        try {
+            String npath = normalizePath(path);
+            if (watch) {
+                return zk.getChildren().watched().forPath(npath);
+            } else {
+                return zk.getChildren().forPath(npath);
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    // Deletes the state inside the zookeeper for a key, for which the
+    // contents of the key starts with nimbus host port information
+    public static void deleteNodeBlobstore(CuratorFramework zk, String parentPath, String hostPortInfo){
+        String normalizedPatentPath = normalizePath(parentPath);
+        List<String> childPathList = null;
+        if (existsNode(zk, normalizedPatentPath, false)) {
+            childPathList = getChildren(zk, normalizedPatentPath, false);
+            for (String child : childPathList) {
+                if (child.startsWith(hostPortInfo)) {
+                    LOG.debug("deleteNode child {}", child);
+                    deleteNode(zk, normalizedPatentPath + "/" + child);
+                }
+            }
+        }
+    }
+
+    public static Stat setData(CuratorFramework zk, String path, byte[] data){
+        try {
+            String npath = normalizePath(path);
+            return zk.setData().forPath(npath, data);
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public static boolean exists(CuratorFramework zk, String path, boolean watch){
+        return existsNode(zk, path, watch);
+    }
+
+    public static List mkInprocessZookeeper(String localdir, Integer port) throws Exception {
+        File localfile = new File(localdir);
+        ZooKeeperServer zk = new ZooKeeperServer(localfile, localfile, 2000);
+        NIOServerCnxnFactory factory = null;
+        int report = 2000;
+        int limitPort = 65535;
+        if (port != null) {
+            report = port;
+            limitPort = port;
+        }
+        while (true) {
+            try {
+                factory = new NIOServerCnxnFactory();
+                factory.configure(new InetSocketAddress(report), 0);
+                break;
+            } catch (BindException e) {
+                report++;
+                if (report > limitPort) {
+                    throw new RuntimeException("No port is available to launch an inprocess zookeeper");
+                }
+            }
+        }
+        LOG.info("Starting inprocess zookeeper at port {} and dir {}", report, localdir);
+        factory.startup(zk);
+        return Arrays.asList((Object) new Long(report), (Object) factory);
+    }
+
+    public static void shutdownInprocessZookeeper(NIOServerCnxnFactory handle) {
+        handle.shutdown();
+    }
+
+    public static NimbusInfo toNimbusInfo(Participant participant) {
+        String id = participant.getId();
+        if (StringUtils.isBlank(id)) {
+            throw new RuntimeException("No nimbus leader participant host found, have you started your nimbus hosts?");
+        }
+        NimbusInfo nimbusInfo = NimbusInfo.parse(id);
+        nimbusInfo.setLeader(participant.isLeader());
+        return nimbusInfo;
+    }
+
+    // Leader latch listener that will be invoked when we either gain or lose leadership
+    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
+        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        return new LeaderLatchListener() {
+            @Override
+            public void isLeader() {
+                Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+                Set<String> localTopologyIds = blobStore.filterAndListKeys(new KeyFilter<String>() {
+                    @Override
+                    public String filter(String key) {
+                        return ConfigUtils.getIdFromBlobKey(key);
+                    }
+                });
+                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, localTopologyIds);
+                LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology [{}]",
+                        generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds),
+                        generateJoinedString(diffTopology));
+
+                if (diffTopology.isEmpty()) {
+                    LOG.info("Accepting leadership, all active topology found locally.");
+                } else {
+                    LOG.info("code for all active topologies not available locally, giving up leadership.");
+                    try {
+                        leaderLatch.close();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void notLeader() {
+                LOG.info("{} lost leadership.", hostName);
+            }
+
+            private String generateJoinedString(Set<String> activeTopologyIds) {
+                return Joiner.on(",").join(activeTopologyIds);
+            }
+        };
+    }
+
+    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, blobStore);
+    }
+
+    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
+        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
+        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
+        String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
+        String id = NimbusInfo.fromConf(conf).toHostPortString();
+        AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
+        AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
+                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
+        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
+            leaderLatchListenerAtomicReference, blobStore);
+    }
+
+    /**
+     * Get the data along with a version
+     * @param zk the zk instance to use
+     * @param path the path to get it from
+     * @param watch should a watch be enabled
+     * @return null if no data is found, else the data with the version.
+     */
+    public static VersionedData<byte[]> getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
+        VersionedData<byte[]> data = null;
+        try {
+            byte[] bytes = null;
+            Stat stats = new Stat();
+            String npath = normalizePath(path);
+            if (existsNode(zk, npath, watch)) {
+                if (watch) {
+                    bytes = zk.getData().storingStatIn(stats).watched().forPath(npath);
+                } else {
+                    bytes = zk.getData().storingStatIn(stats).forPath(npath);
+                }
+                if (bytes != null) {
+                    int version = stats.getVersion();
+                    data = new VersionedData<>(version, bytes);
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+                // this is fine b/c we still have a watch from the successful exists call
+            } else {
+                Utils.wrapInRuntime(e);
+            }
+        }
+        return data;
+    }
+
+    public static List<String> tokenizePath(String path) {
+        String[] toks = path.split("/");
+        java.util.ArrayList<String> rtn = new ArrayList<String>();
+        for (String str : toks) {
+            if (!str.isEmpty()) {
+                rtn.add(str);
+            }
+        }
+        return rtn;
+    }
+
+    public static String parentPath(String path) {
+        List<String> toks = Zookeeper.tokenizePath(path);
+        int size = toks.size();
+        if (size > 0) {
+            toks.remove(size - 1);
+        }
+        return Zookeeper.toksToPath(toks);
+    }
+
+    public static String toksToPath(List<String> toks) {
+        StringBuffer buff = new StringBuffer();
+        buff.append("/");
+        int size = toks.size();
+        for (int i = 0; i < size; i++) {
+            buff.append(toks.get(i));
+            if (i < (size - 1)) {
+                buff.append("/");
+            }
+        }
+        return buff.toString();
+    }
+
+    public static String normalizePath(String path) {
+        String rtn = toksToPath(tokenizePath(path));
+        return rtn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/native/worker-launcher/impl/main.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/main.c b/storm-core/src/native/worker-launcher/impl/main.c
index a51f9f9..b14db45 100644
--- a/storm-core/src/native/worker-launcher/impl/main.c
+++ b/storm-core/src/native/worker-launcher/impl/main.c
@@ -155,7 +155,23 @@ int main(int argc, char **argv) {
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     }
-    exit_code = setup_stormdist_dir(argv[optind]);
+    exit_code = setup_dir_permissions(argv[optind], 0);
+  } else if (strcasecmp("artifacts-dir", command) == 0) {
+    if (argc != 4) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for artifacts-dir\n",
+	      argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    exit_code = setup_dir_permissions(argv[optind], 1);
+  } else if (strcasecmp("blob", command) == 0) {
+      if (argc != 4) {
+          fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for blob\n",
+                  argc);
+          fflush(ERRORFILE);
+          return INVALID_ARGUMENT_NUMBER;
+      }
+      exit_code = setup_dir_permissions(argv[optind], 0);
   } else if (strcasecmp("rmr", command) == 0) {
     if (argc != 4) {
       fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for rmr\n",
@@ -173,7 +189,7 @@ int main(int argc, char **argv) {
       return INVALID_ARGUMENT_NUMBER;
     }
     working_dir = argv[optind++];
-    exit_code = setup_stormdist_dir(working_dir);
+    exit_code = setup_dir_permissions(working_dir, 1);
     if (exit_code == 0) {
       exit_code = exec_as_user(working_dir, argv[optind]);
     }


Mime
View raw message