storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [04/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:38 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
new file mode 100644
index 0000000..d3e3925
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+    /**
+     * A future that has already completed.
+     */
+    private static class AllDoneFuture implements Future<Void> {
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+
+        @Override
+        public Void get() {
+            return null;
+        }
+
+        @Override
+        public Void get(long timeout, TimeUnit unit) {
+            return null;
+        }
+
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
+
+    private final Localizer _localizer;
+    private final ExecutorService _execService;
+    private final boolean _isLocalMode;
+    private final Map<String, Object> _conf;
+    private final Map<String, LocalDownloadedResource> _basicPending;
+    private final Map<String, LocalDownloadedResource> _blobPending;
+    private final AdvancedFSOps _fsOps;
+
+    private class DownloadBaseBlobsDistributed implements Callable<Void> {
+        protected final String _topologyId;
+        protected final File _stormRoot;
+        
+        public DownloadBaseBlobsDistributed(String topologyId) throws IOException {
+            _topologyId = topologyId;
+            _stormRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf, _topologyId));
+        }
+        
+        protected void downloadBaseBlobs(File tmproot) throws Exception {
+            String stormJarKey = ConfigUtils.masterStormJarKey(_topologyId);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            String jarPath = ConfigUtils.supervisorStormJarPath(tmproot.getAbsolutePath());
+            String codePath = ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath());
+            String confPath = ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath());
+            _fsOps.forceMkdir(tmproot);
+            _fsOps.restrictDirectoryPermissions(tmproot);
+            ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(_conf);
+            try {
+                Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
+                Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
+                Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
+            } finally {
+                blobStore.shutdown();
+            }
+            Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, tmproot);
+        }
+        
+        @Override
+        public Void call() throws Exception {
+            try {
+                if (_fsOps.fileExists(_stormRoot)) {
+                    if (!_fsOps.supportsAtomicDirectoryMove()) {
+                        LOG.warn("{} may have partially downloaded blobs, recovering", _topologyId);
+                        _fsOps.deleteIfExists(_stormRoot);
+                    } else {
+                        LOG.warn("{} already downloaded blobs, skipping", _topologyId);
+                        return null;
+                    }
+                }
+                boolean deleteAll = true;
+                String tmproot = ConfigUtils.supervisorTmpDir(_conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+                File tr = new File(tmproot);
+                try {
+                    downloadBaseBlobs(tr);
+                    _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
+                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
+                    deleteAll = false;
+                } finally {
+                    if (deleteAll) {
+                        LOG.warn("Failed to download basic resources for topology-id {}", _topologyId);
+                        _fsOps.deleteIfExists(tr);
+                        _fsOps.deleteIfExists(_stormRoot);
+                    }
+                }
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
+                throw e;
+            }
+        }
+    }
+    
+    private class DownloadBaseBlobsLocal extends DownloadBaseBlobsDistributed {
+
+        public DownloadBaseBlobsLocal(String topologyId) throws IOException {
+            super(topologyId);
+        }
+        
+        @Override
+        protected void downloadBaseBlobs(File tmproot) throws Exception {
+            _fsOps.forceMkdir(tmproot);
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(_topologyId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(_topologyId);
+            File codePath = new File(ConfigUtils.supervisorStormCodePath(tmproot.getAbsolutePath()));
+            File confPath = new File(ConfigUtils.supervisorStormConfPath(tmproot.getAbsolutePath()));
+            BlobStore blobStore = Utils.getNimbusBlobStore(_conf, null);
+            try {
+                try (OutputStream codeOutStream = _fsOps.getOutputStream(codePath)){
+                    blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
+                }
+                try (OutputStream confOutStream = _fsOps.getOutputStream(confPath)) {
+                    blobStore.readBlobTo(stormConfKey, confOutStream, null);
+                }
+            } finally {
+                blobStore.shutdown();
+            }
+
+            ClassLoader classloader = Thread.currentThread().getContextClassLoader();
+            String resourcesJar = AsyncLocalizer.resourcesJar();
+            URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
+
+            String targetDir = tmproot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+
+            if (resourcesJar != null) {
+                LOG.info("Extracting resources from jar at {} to {}", resourcesJar, targetDir);
+                Utils.extractDirFromJar(resourcesJar, ConfigUtils.RESOURCES_SUBDIR, new File(targetDir));
+            } else if (url != null) {
+                LOG.info("Copying resources at {} to {} ", url.toString(), targetDir);
+                if ("jar".equals(url.getProtocol())) {
+                    JarURLConnection urlConnection = (JarURLConnection) url.openConnection();
+                    Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), ConfigUtils.RESOURCES_SUBDIR, new File(targetDir));
+                } else {
+                    _fsOps.copyDirectory(new File(url.getFile()), new File(targetDir));
+                }
+            }
+        }
+    }
+    
+    private class DownloadBlobs implements Callable<Void> {
+        private final String _topologyId;
+
+        public DownloadBlobs(String topologyId) {
+            _topologyId = topologyId;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            try {
+                String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+                Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+
+                @SuppressWarnings("unchecked")
+                Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+                String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+
+                List<LocalResource> localResourceList = new ArrayList<>();
+                if (blobstoreMap != null) {
+                    List<LocalResource> tmp = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+                    if (tmp != null) {
+                        localResourceList.addAll(tmp);
+                    }
+                }
+
+                StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _fsOps);
+                List<String> dependencies = new ArrayList<>();
+                if (stormCode.is_set_dependency_jars()) {
+                    dependencies.addAll(stormCode.get_dependency_jars());
+                }
+                if (stormCode.is_set_dependency_artifacts()) {
+                    dependencies.addAll(stormCode.get_dependency_artifacts());
+                }
+                for (String dependency : dependencies) {
+                    localResourceList.add(new LocalResource(dependency, false));
+                }
+
+                if (!localResourceList.isEmpty()) {
+                    File userDir = _localizer.getLocalUserFileCacheDir(user);
+                    if (!_fsOps.fileExists(userDir)) {
+                        _fsOps.forceMkdir(userDir);
+                    }
+                    List<LocalizedResource> localizedResources = _localizer.getBlobs(localResourceList, user, topoName, userDir);
+                    _fsOps.setupBlobPermissions(userDir, user);
+                    for (LocalizedResource localizedResource : localizedResources) {
+                        String keyName = localizedResource.getKey();
+                        //The sym link we are pointing to
+                        File rsrcFilePath = new File(localizedResource.getCurrentSymlinkPath());
+
+                        String symlinkName = null;
+                        if (blobstoreMap != null) {
+                            Map<String, Object> blobInfo = blobstoreMap.get(keyName);
+                            if (blobInfo != null && blobInfo.containsKey("localname")) {
+                                symlinkName = (String) blobInfo.get("localname");
+                            } else {
+                                symlinkName = keyName;
+                            }
+                        } else {
+                            // all things are from dependencies
+                            symlinkName = keyName;
+                        }
+                        _fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
+                    }
+                }
+
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
+                throw e;
+            }
+        }
+    }
+    
+    //Visible for testing
+    AsyncLocalizer(Map<String, Object> conf, Localizer localizer, AdvancedFSOps ops) {
+        _conf = conf;
+        _isLocalMode = ConfigUtils.isLocalMode(conf);
+        _localizer = localizer;
+        _execService = Executors.newFixedThreadPool(1,  
+                new ThreadFactoryBuilder()
+                .setNameFormat("Async Localizer")
+                .build());
+        _basicPending = new HashMap<>();
+        _blobPending = new HashMap<>();
+        _fsOps = ops;
+    }
+    
+    public AsyncLocalizer(Map<String, Object> conf, Localizer localizer) {
+        this(conf, localizer, AdvancedFSOps.make(conf));
+    }
+
+    @Override
+    public synchronized Future<Void> requestDownloadBaseTopologyBlobs(final LocalAssignment assignment, final int port) throws IOException {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        if (localResource == null) {
+            Callable<Void> c;
+            if (_isLocalMode) {
+                c = new DownloadBaseBlobsLocal(topologyId);
+            } else {
+                c = new DownloadBaseBlobsDistributed(topologyId);
+            }
+            localResource = new LocalDownloadedResource(_execService.submit(c));
+            _basicPending.put(topologyId, localResource);
+        }
+        Future<Void> ret = localResource.reserve(port, assignment);
+        LOG.debug("Reserved basic {} {}", topologyId, localResource);
+        return ret;
+    }
+
+    private static String resourcesJar() throws IOException {
+        String path = Utils.currentClasspath();
+        if (path == null) {
+            return null;
+        }
+        
+        for (String jpath : path.split(File.pathSeparator)) {
+            if (jpath.endsWith(".jar")) {
+                if (Utils.zipDoesContainDir(jpath, ConfigUtils.RESOURCES_SUBDIR)) {
+                    return jpath;
+                }
+            }
+        }
+        return null;
+    }
+    
+    @Override
+    public synchronized void recoverRunningTopology(LocalAssignment assignment, int port) {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _basicPending.get(topologyId);
+        if (localResource == null) {
+            localResource = new LocalDownloadedResource(new AllDoneFuture());
+            _basicPending.put(topologyId, localResource);
+        }
+        localResource.reserve(port, assignment);
+        LOG.debug("Recovered basic {} {}", topologyId, localResource);
+        
+        localResource = _blobPending.get(topologyId);
+        if (localResource == null) {
+            localResource = new LocalDownloadedResource(new AllDoneFuture());
+            _blobPending.put(topologyId, localResource);
+        }
+        localResource.reserve(port, assignment);
+        LOG.debug("Recovered blobs {} {}", topologyId, localResource);
+    }
+    
+    @Override
+    public synchronized Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port) {
+        final String topologyId = assignment.get_topology_id();
+        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        if (localResource == null) {
+            Callable<Void> c = new DownloadBlobs(topologyId);
+            localResource = new LocalDownloadedResource(_execService.submit(c));
+            _blobPending.put(topologyId, localResource);
+        }
+        Future<Void> ret = localResource.reserve(port, assignment);
+        LOG.debug("Reserved blobs {} {}", topologyId, localResource);
+        return ret;
+    }
+
+    @Override
+    public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
+        final String topologyId = assignment.get_topology_id();
+        LOG.debug("Releasing slot for {} {}", topologyId, port);
+        LocalDownloadedResource localResource = _blobPending.get(topologyId);
+        if (localResource == null || !localResource.release(port, assignment)) {
+            LOG.warn("Released blob reference {} {} for something that we didn't have {}", topologyId, port, localResource);
+        } else if (localResource.isDone()){
+            LOG.info("Released blob reference {} {} Cleaning up BLOB references...", topologyId, port);
+            _blobPending.remove(topologyId);
+            Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, topologyId);
+            @SuppressWarnings("unchecked")
+            Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+            if (blobstoreMap != null) {
+                String user = (String) topoConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+                String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
+                
+                for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) {
+                    String key = entry.getKey();
+                    Map<String, Object> blobInfo = entry.getValue();
+                    try {
+                        _localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo));
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+        } else {
+            LOG.debug("Released blob reference {} {} still waiting on {}", topologyId, port, localResource);
+        }
+        
+        localResource = _basicPending.get(topologyId);
+        if (localResource == null || !localResource.release(port, assignment)) {
+            LOG.warn("Released basic reference {} {} for something that we didn't have {}", topologyId, port, localResource);
+        } else if (localResource.isDone()){
+            LOG.info("Released blob reference {} {} Cleaning up basic files...", topologyId, port);
+            _basicPending.remove(topologyId);
+            String path = ConfigUtils.supervisorStormDistRoot(_conf, topologyId);
+            _fsOps.deleteIfExists(new File(path), null, "rmr "+topologyId);
+        } else {
+            LOG.debug("Released basic reference {} {} still waiting on {}", topologyId, port, localResource);
+        }
+    }
+
+    @Override
+    public synchronized void cleanupUnusedTopologies() throws IOException {
+        File distRoot = new File(ConfigUtils.supervisorStormDistRoot(_conf));
+        LOG.info("Cleaning up unused topologies in {}", distRoot);
+        File[] children = distRoot.listFiles();
+        if (children != null) {
+            for (File topoDir : children) {
+                String topoId = URLDecoder.decode(topoDir.getName(), "UTF-8");
+                if (_basicPending.get(topoId) == null && _blobPending.get(topoId) == null) {
+                    _fsOps.deleteIfExists(topoDir, null, "rmr " + topoId);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        _execService.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
new file mode 100644
index 0000000..7105095
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/ILocalizer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+
+import org.apache.storm.generated.LocalAssignment;
+
+/**
+ * Download blobs from the blob store and keep them up to date.
+ */
+public interface ILocalizer {
+
+    /**
+     * Recover a running topology by incrementing references for what it has already downloaded.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running in.
+     */
+    void recoverRunningTopology(LocalAssignment assignemnt, int port);
+    
+    /**
+     * Download storm.jar, storm.conf, and storm.ser for this topology if not done so already,
+     * and inc a reference count on them.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @return a future to let you know when they are done.
+     * @throws IOException on error 
+     */
+    Future<Void> requestDownloadBaseTopologyBlobs(LocalAssignment assignment, int port) throws IOException;
+
+    /**
+     * Download the blobs for this topology (reading in list in from the config)
+     * and inc reference count for these blobs.
+     * PRECONDITION: requestDownloadBaseTopologyBlobs has completed downloading.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @return a future to let you know when they are done.
+     */
+    Future<Void> requestDownloadTopologyBlobs(LocalAssignment assignment, int port);
+    
+    /**
+     * dec reference count on all blobs associated with this topology.
+     * @param assignment the assignment the resources are for
+     * @param port the port the topology is running on
+     * @throws IOException on any error
+     */
+    void releaseSlotFor(LocalAssignment assignment, int port) throws IOException;
+    
+    /**
+     * Clean up any topologies that are not in use right now.
+     * @throws IOException on any error.
+     */
+    void cleanupUnusedTopologies() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
new file mode 100644
index 0000000..570c951
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalDownloadedResource.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.storm.generated.LocalAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalDownloadedResource {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalDownloadedResource.class);
+    private static class NoCancelFuture<T> implements Future<T> {
+        private final Future<T> _wrapped;
+        
+        public NoCancelFuture(Future<T> wrapped) {
+            _wrapped = wrapped;
+        }
+        
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            //cancel not currently supported
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return _wrapped.isDone();
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+            return _wrapped.get();
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return _wrapped.get(timeout, unit);
+        }
+    }
+    private static class PortNAssignment {
+        private final int _port;
+        private final LocalAssignment _assignment;
+        
+        public PortNAssignment(int port, LocalAssignment assignment) {
+            _port = port;
+            _assignment = assignment;
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof PortNAssignment)) {
+                return false;
+            }
+            PortNAssignment pna = (PortNAssignment) other;
+            return pna._port == _port && _assignment.equals(pna._assignment); 
+        }
+        
+        @Override
+        public int hashCode() {
+            return (17 * _port) + _assignment.hashCode();
+        }
+        
+        @Override
+        public String toString() {
+            return "{"+ _port + " " + _assignment +"}";
+        }
+    }
+    private final Future<Void> _pending;
+    private final Set<PortNAssignment> _references;
+    private boolean _isDone;
+    
+    
+    public LocalDownloadedResource(Future<Void> pending) {
+        _pending = new NoCancelFuture<>(pending);
+        _references = new HashSet<>();
+        _isDone = false;
+    }
+
+    /**
+     * Reserve the resources
+     * @param port the port this is for
+     * @param la the assignment this is for
+     * @return a future that can be used to track it being downloaded.
+     */
+    public synchronized Future<Void> reserve(int port, LocalAssignment la) {
+        PortNAssignment pna = new PortNAssignment(port, la);
+        if (!_references.add(pna)) {
+            LOG.warn("Resources {} already reserved {} for this topology", pna, _references);
+        }
+        return _pending;
+    }
+    
+    /**
+     * Release a port from the reference count, and update isDone if all is done.
+     * @param port the port to release
+     * @param la the assignment to release
+     * @return true if the port was being counted else false
+     */
+    public synchronized boolean release(int port, LocalAssignment la) {
+        PortNAssignment pna = new PortNAssignment(port, la);
+        boolean ret = _references.remove(pna);
+        if (ret && _references.isEmpty()) {
+            _isDone = true;
+        }
+        return ret;
+    }
+    
+    /**
+     * Is this has been cleaned up completely.
+     * @return true if it is done else false
+     */
+    public synchronized boolean isDone() {
+        return _isDone;
+    }
+
+    @Override
+    public String toString() {
+        return _references.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 380e777..9f42b47 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -93,7 +93,7 @@ public class LocalizedResourceRetentionSet {
           i.remove();
         } else {
           // since it failed to delete add it back so it gets retried
-          set.addResource(resource.getKey(), resource, resource.isUncompressed());
+          set.add(resource.getKey(), resource, resource.isUncompressed());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
index b5f00c3..62d5b2f 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/LocalizedResourceSet.java
@@ -57,7 +57,7 @@ public class LocalizedResourceSet {
     return _localrsrcFiles.get(name);
   }
 
-  public void updateResource(String resourceName, LocalizedResource updatedResource,
+  public void putIfAbsent(String resourceName, LocalizedResource updatedResource,
                             boolean uncompress) {
     if (uncompress) {
       _localrsrcArchives.putIfAbsent(resourceName, updatedResource);
@@ -66,7 +66,7 @@ public class LocalizedResourceSet {
     }
   }
 
-  public void addResource(String resourceName, LocalizedResource newResource, boolean uncompress) {
+  public void add(String resourceName, LocalizedResource newResource, boolean uncompress) {
     if (uncompress) {
       _localrsrcArchives.put(resourceName, newResource);
     } else {
@@ -76,9 +76,9 @@ public class LocalizedResourceSet {
 
   public boolean exists(String resourceName, boolean uncompress) {
     if (uncompress) {
-      return (_localrsrcArchives.get(resourceName) != null);
+      return _localrsrcArchives.containsKey(resourceName);
     }
-    return (_localrsrcFiles.get(resourceName) != null);
+    return _localrsrcFiles.containsKey(resourceName);
   }
 
   public boolean remove(LocalizedResource resource) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
index b91cecb..0135397 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/Localizer.java
@@ -63,20 +63,6 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
  */
 public class Localizer {
   public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
-
-  private Map _conf;
-  private int _threadPoolSize;
-  // thread pool for initial download
-  private ExecutorService _execService;
-  // thread pool for updates
-  private ExecutorService _updateExecService;
-  private int _blobDownloadRetries;
-
-  // track resources - user to resourceSet
-  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
-      ConcurrentHashMap<String, LocalizedResourceSet>();
-
-  private String _localBaseDir;
   public static final String USERCACHE = "usercache";
   public static final String FILECACHE = "filecache";
 
@@ -85,13 +71,29 @@ public class Localizer {
   public static final String ARCHIVESDIR = "archives";
 
   private static final String TO_UNCOMPRESS = "_tmp_";
+  
+  
+  
+  private final Map<String, Object> _conf;
+  private final int _threadPoolSize;
+  // thread pool for initial download
+  private final ExecutorService _execService;
+  // thread pool for updates
+  private final ExecutorService _updateExecService;
+  private final int _blobDownloadRetries;
+
+  // track resources - user to resourceSet
+  private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new
+      ConcurrentHashMap<String, LocalizedResourceSet>();
+
+  private final String _localBaseDir;
 
   // cleanup
   private long _cacheTargetSize;
   private long _cacheCleanupPeriod;
   private ScheduledExecutorService _cacheCleanupService;
 
-  public Localizer(Map conf, String baseDir) {
+  public Localizer(Map<String, Object> conf, String baseDir) {
     _conf = conf;
     _localBaseDir = baseDir;
     // default cache size 10GB, converted to Bytes
@@ -189,7 +191,7 @@ public class Localizer {
         LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path);
         LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path,
             uncompress);
-        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, uncompress);
+        lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress);
       }
     }
   }
@@ -369,7 +371,7 @@ public class Localizer {
           if (newlrsrcSet == null) {
             newlrsrcSet = newSet;
           }
-          newlrsrcSet.updateResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+          newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
           results.add(lrsrc);
         }
         catch (ExecutionException e) {
@@ -451,7 +453,7 @@ public class Localizer {
       for (Future<LocalizedResource> futureRsrc: futures) {
         LocalizedResource lrsrc = futureRsrc.get();
         lrsrc.addReference(topo);
-        lrsrcSet.addResource(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
+        lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed());
         results.add(lrsrc);
       }
     } catch (ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
new file mode 100644
index 0000000..d433920
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import com.codahale.metrics.*;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unchecked")
+public class StormMetricsRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class);
+    public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry();
+
+    public static Meter registerMeter(String name) {
+        Meter meter = new Meter();
+        return register(name, meter);
+    }
+
+    // TODO: should replace Callable to Gauge<Integer> when nimbus.clj is translated to java
+    public static Gauge<Integer> registerGauge(final String name, final Callable fn) {
+        Gauge<Integer> gauge = new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                try {
+                    return (Integer) fn.call();
+                } catch (Exception e) {
+                    LOG.error("Error getting gauge value for {}", name, e);
+                }
+                return 0;
+            }
+        };
+        return register(name, gauge);
+    }
+
+    public static Histogram registerHistogram(String name, Reservoir reservoir) {
+        Histogram histogram = new Histogram(reservoir);
+        return register(name, histogram);
+    }
+
+    public static void startMetricsReporters(Map stormConf) {
+        for (PreparableReporter reporter : MetricsUtils.getPreparableReporters(stormConf)) {
+            startMetricsReporter(reporter, stormConf);
+        }
+    }
+
+    private static void startMetricsReporter(PreparableReporter reporter, Map stormConf) {
+        reporter.prepare(StormMetricsRegistry.DEFAULT_REGISTRY, stormConf);
+        reporter.start();
+        LOG.info("Started statistics report plugin...");
+    }
+
+    private static <T extends Metric> T register(String name, T metric) {
+        T ret;
+        try {
+            ret = DEFAULT_REGISTRY.register(name, metric);
+        } catch (IllegalArgumentException e) {
+            // swallow IllegalArgumentException when the metric exists already
+            ret = (T) DEFAULT_REGISTRY.getMetrics().get(name);
+            if (ret == null) {
+                throw e;
+            } else {
+                LOG.warn("Metric {} has already been registered", name);
+            }
+        }
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java b/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
index 3c729ec..e8789df 100644
--- a/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
@@ -37,14 +37,14 @@ public interface ILeaderElector extends Closeable {
      * check isLeader() to perform any leadership action. This method can be called
      * multiple times so it needs to be idempotent.
      */
-    void addToLeaderLockQueue();
+    void addToLeaderLockQueue() throws Exception;
 
     /**
      * Removes the caller from the leader lock queue. If the caller is leader
      * also releases the lock. This method can be called multiple times so it needs
      * to be idempotent.
      */
-    void removeFromLeaderLockQueue();
+    void removeFromLeaderLockQueue() throws Exception;
 
     /**
      *
@@ -62,7 +62,7 @@ public interface ILeaderElector extends Closeable {
      *
      * @return list of current nimbus addresses, includes leader.
      */
-    List<NimbusInfo> getAllNimbuses();
+    List<NimbusInfo> getAllNimbuses()throws Exception;
 
     /**
      * Method called to allow for cleanup. once close this object can not be reused.

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index 32d28fe..8415ce3 100644
--- a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -134,6 +134,21 @@ public class SerializationFactory {
         Map<String, Map<String, Integer>> streamNametoId = new HashMap<>();
         Map<String, Map<Integer, String>> streamIdToName = new HashMap<>();
 
+        /**
+         * "{:a 1  :b 2} -> {1 :a  2 :b}"
+         *
+         * Note: Only one key wins if there are duplicate values.
+         *       Which key wins is indeterminate:
+         * "{:a 1  :b 1} -> {1 :a} *or* {1 :b}"
+         */
+        private static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
+            Map<V, K> ret = new HashMap<V, K>();
+            for (Map.Entry<K, V> entry : map.entrySet()) {
+                ret.put(entry.getValue(), entry.getKey());
+            }
+            return ret;
+        }
+
         public IdDictionary(StormTopology topology) {
             List<String> componentNames = new ArrayList<>(topology.get_spouts().keySet());
             componentNames.addAll(topology.get_bolts().keySet());
@@ -143,7 +158,7 @@ public class SerializationFactory {
                 ComponentCommon common = Utils.getComponentCommon(topology, name);
                 List<String> streams = new ArrayList<>(common.get_streams().keySet());
                 streamNametoId.put(name, idify(streams));
-                streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
+                streamIdToName.put(name, simpleReverseMap(streamNametoId.get(name)));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
index 2f06102..0284725 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/FeederSpout.java
@@ -17,17 +17,18 @@
  */
 package org.apache.storm.testing;
 
-import org.apache.storm.topology.OutputFieldsDeclarer;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.InprocMessaging;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
 
 
 public class FeederSpout extends BaseRichSpout {
@@ -51,7 +52,15 @@ public class FeederSpout extends BaseRichSpout {
 
     public void feed(List<Object> tuple, Object msgId) {
         InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
-    }    
+    }
+    
+    public void feedNoWait(List<Object> tuple, Object msgId) {
+        InprocMessaging.sendMessageNoWait(_id, new Values(tuple, msgId));
+    }
+    
+    public void waitForReader() {
+        InprocMessaging.waitForReader(_id);
+    }
     
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         _collector = collector;
@@ -63,17 +72,11 @@ public class FeederSpout extends BaseRichSpout {
 
     public void nextTuple() {
         List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
-        if(toEmit!=null) {
+        if (toEmit!=null) {
             List<Object> tuple = (List<Object>) toEmit.get(0);
             Object msgId = toEmit.get(1);
             
             _collector.emit(tuple, msgId);
-        } else {
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
new file mode 100644
index 0000000..d33dc9c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.testing.staticmocking;
+
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+
+public class MockedSupervisorUtils implements AutoCloseable {
+
+    public MockedSupervisorUtils(SupervisorUtils inst) {
+        SupervisorUtils.setInstance(inst);
+    }
+
+    @Override
+    public void close() throws Exception {
+        SupervisorUtils.resetInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java b/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
index 8272b3c..f5d317e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/util/TridentUtils.java
@@ -111,7 +111,7 @@ public class TridentUtils {
         return Utils.thriftSerialize(t);
     }
 
-    public static <T> T thriftDeserialize(Class c, byte[] b) {
+    public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
         return Utils.thriftDeserialize(c,b);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
new file mode 100644
index 0000000..e2be8a7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.validation.ConfigValidation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class ConfigUtils {
+    private final static Logger LOG = LoggerFactory.getLogger(ConfigUtils.class);
+    public final static String RESOURCES_SUBDIR = "resources";
+    public final static String NIMBUS_DO_NOT_REASSIGN = "NIMBUS-DO-NOT-REASSIGN";
+    public static final String FILE_SEPARATOR = File.separator;
+
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static ConfigUtils _instance = new ConfigUtils();
+
+    /**
+     * Provide an instance of this class for delegates to use.  To mock out
+     * delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     * @param u a Utils instance
+     * @return the previously set instance
+     */
+    public static ConfigUtils setInstance(ConfigUtils u) {
+        ConfigUtils oldInstance = _instance;
+        _instance = u;
+        return oldInstance;
+    }
+
+    public static String getLogDir() {
+        String dir;
+        Map conf;
+        if (System.getProperty("storm.log.dir") != null) {
+            dir = System.getProperty("storm.log.dir");
+        } else if ((conf = readStormConfig()).get("storm.log.dir") != null) {
+            dir = String.valueOf(conf.get("storm.log.dir"));
+        } else if (System.getProperty("storm.home") != null) {
+            dir = System.getProperty("storm.home") + FILE_SEPARATOR + "logs";
+        } else {
+            dir = "logs";
+        }
+        try {
+            return new File(dir).getCanonicalPath();
+        } catch (IOException ex) {
+            throw new IllegalArgumentException("Illegal storm.log.dir in conf: " + dir);
+        }
+    }
+
+    public static String clojureConfigName(String name) {
+        return name.toUpperCase().replace("_", "-");
+    }
+
+    // ALL-CONFIGS is only used by executor.clj once, do we want to do it here? TODO
+    public static List<Object> All_CONFIGS() {
+        List<Object> ret = new ArrayList<Object>();
+        Config config = new Config();
+        Class<?> ConfigClass = config.getClass();
+        Field[] fields = ConfigClass.getFields();
+        for (int i = 0; i < fields.length; i++) {
+            try {
+                Object obj = fields[i].get(null);
+                ret.add(obj);
+            } catch (IllegalArgumentException e) {
+                LOG.error(e.getMessage(), e);
+            } catch (IllegalAccessException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        return ret;
+    }
+
+    public static String clusterMode(Map conf) {
+        String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
+        return mode;
+    }
+
+    public static boolean isLocalMode(Map<String, Object> conf) {
+        String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
+        if (mode != null) {
+            if ("local".equals(mode)) {
+                return true;
+            }
+            if ("distributed".equals(mode)) {
+                return false;
+            }
+            throw new IllegalArgumentException("Illegal cluster mode in conf: " + mode);
+        }
+        return true;
+    }
+
+    public static int samplingRate(Map conf) {
+        double rate = Utils.getDouble(conf.get(Config.TOPOLOGY_STATS_SAMPLE_RATE));
+        if (rate != 0) {
+            return (int) (1 / rate);
+        }
+        throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
+    }
+
+    public static Callable<Boolean> evenSampler(final int samplingFreq) {
+        final Random random = new Random();
+
+        return new Callable<Boolean>() {
+            private int curr = -1;
+            private int target = random.nextInt(samplingFreq);
+
+            @Override
+            public Boolean call() throws Exception {
+                curr++;
+                if (curr >= samplingFreq) {
+                    curr = 0;
+                    target = random.nextInt(samplingFreq);
+                }
+                return (curr == target);
+            }
+        };
+    }
+
+    public static Callable<Boolean> mkStatsSampler(Map conf) {
+        return evenSampler(samplingRate(conf));
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static Map<String, Object> readStormConfig() {
+        return _instance.readStormConfigImpl();
+    }
+
+    public Map<String, Object> readStormConfigImpl() {
+        Map<String, Object> conf = Utils.readStormConfig();
+        ConfigValidation.validateFields(conf);
+        return conf;
+    }
+
+    public static Map readYamlConfig(String name, boolean mustExist) {
+        Map conf = Utils.findAndReadConfigFile(name, mustExist);
+        ConfigValidation.validateFields(conf);
+        return conf;
+    }
+
+    public static Map readYamlConfig(String name) {
+        return readYamlConfig(name, true);
+    }
+
+    public static String absoluteStormLocalDir(Map conf) {
+        String stormHome = System.getProperty("storm.home");
+        String localDir = (String) conf.get(Config.STORM_LOCAL_DIR);
+        if (localDir == null) {
+            return (stormHome + FILE_SEPARATOR + "storm-local");
+        } else {
+            if (new File(localDir).isAbsolute()) {
+                return localDir;
+            } else {
+                return (stormHome + FILE_SEPARATOR + localDir);
+            }
+        }
+    }
+
+    public static String absoluteHealthCheckDir(Map conf) {
+        String stormHome = System.getProperty("storm.home");
+        String healthCheckDir = (String) conf.get(Config.STORM_HEALTH_CHECK_DIR);
+        if (healthCheckDir == null) {
+            return (stormHome + FILE_SEPARATOR + "healthchecks");
+        } else {
+            if (new File(healthCheckDir).isAbsolute()) {
+                return healthCheckDir;
+            } else {
+                return (stormHome + FILE_SEPARATOR + healthCheckDir);
+            }
+        }
+    }
+
+    public static String masterLocalDir(Map conf) throws IOException {
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPARATOR + "nimbus";
+        FileUtils.forceMkdir(new File(ret));
+        return ret;
+    }
+
+    public static String masterStormJarKey(String topologyId) {
+        return (topologyId + "-stormjar.jar");
+    }
+
+    public static String masterStormCodeKey(String topologyId) {
+        return (topologyId + "-stormcode.ser");
+    }
+
+    public static String masterStormConfKey(String topologyId) {
+        return (topologyId + "-stormconf.ser");
+    }
+
+    public static String masterStormDistRoot(Map conf) throws IOException {
+        String ret = stormDistPath(masterLocalDir(conf));
+        FileUtils.forceMkdir(new File(ret));
+        return ret;
+    }
+
+    public static String masterStormDistRoot(Map conf, String stormId) throws IOException {
+        return (masterStormDistRoot(conf) + FILE_SEPARATOR + stormId);
+    }
+
+    public static String stormDistPath(String stormRoot) {
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (stormRoot != null) {
+            ret = stormRoot;
+        }
+        return ret + FILE_SEPARATOR + "stormdist";
+    }
+
+    public static Map<String, Object> readSupervisorStormConfGivenPath(Map<String, Object> conf, String stormConfPath) throws IOException {
+        Map<String, Object> ret = new HashMap<>(conf);
+        ret.putAll(Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new File(stormConfPath))));
+        return ret;
+    }
+
+    public static StormTopology readSupervisorStormCodeGivenPath(String stormCodePath, AdvancedFSOps ops) throws IOException {
+        return Utils.deserialize(ops.slurp(new File(stormCodePath)), StormTopology.class);
+    }
+
+    public static String masterStormJarPath(String stormRoot) {
+        return (stormRoot + FILE_SEPARATOR + "stormjar.jar");
+    }
+
+    public static String masterInbox(Map conf) throws IOException {
+        String ret = masterLocalDir(conf) + FILE_SEPARATOR + "inbox";
+        FileUtils.forceMkdir(new File(ret));
+        return ret;
+    }
+
+    public static String masterInimbusDir(Map conf) throws IOException {
+        return (masterLocalDir(conf) + FILE_SEPARATOR + "inimbus");
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static String supervisorLocalDir(Map conf) throws IOException {
+        return _instance.supervisorLocalDirImpl(conf);
+    }
+
+    public String supervisorLocalDirImpl(Map conf) throws IOException {
+        String ret = absoluteStormLocalDir(conf) + FILE_SEPARATOR + "supervisor";
+        FileUtils.forceMkdir(new File(ret));
+        return ret;
+    }
+
+    public static String supervisorIsupervisorDir(Map conf) throws IOException {
+        return (supervisorLocalDir(conf) + FILE_SEPARATOR + "isupervisor");
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static String supervisorStormDistRoot(Map conf) throws IOException {
+        return _instance.supervisorStormDistRootImpl(conf);
+    }
+
+    public String supervisorStormDistRootImpl(Map conf) throws IOException {
+        return stormDistPath(supervisorLocalDir(conf));
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static String supervisorStormDistRoot(Map conf, String stormId) throws IOException {
+        return _instance.supervisorStormDistRootImpl(conf, stormId);
+    }
+
+    public String supervisorStormDistRootImpl(Map conf, String stormId) throws IOException {
+        return supervisorStormDistRoot(conf) + FILE_SEPARATOR + URLEncoder.encode(stormId, "UTF-8");
+    }
+
+    public static String concatIfNotNull(String dir) {
+        String ret = "";
+        // we do this since to concat a null String will actually concat a "null", which is not the expected: ""
+        if (dir != null) {
+            ret = dir;
+        }
+        return ret;
+    }
+
+    public static String supervisorStormJarPath(String stormRoot) {
+        return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormjar.jar");
+    }
+
+    public static String supervisorStormCodePath(String stormRoot) {
+        return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormcode.ser");
+    }
+
+    public static String supervisorStormConfPath(String stormRoot) {
+        return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormconf.ser");
+    }
+
+    public static String supervisorTmpDir(Map conf) throws IOException {
+        String ret = supervisorLocalDir(conf) + FILE_SEPARATOR + "tmp";
+        FileUtils.forceMkdir(new File(ret));
+        return ret;
+    }
+
+    public static String supervisorStormResourcesPath(String stormRoot) {
+        return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + RESOURCES_SUBDIR);
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static LocalState supervisorState(Map conf) throws IOException {
+        return _instance.supervisorStateImpl(conf);
+    }
+
+    public LocalState supervisorStateImpl(Map conf) throws IOException {
+        return new LocalState((supervisorLocalDir(conf) + FILE_SEPARATOR + "localstate"));
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static LocalState nimbusTopoHistoryState(Map conf) throws IOException {
+        return _instance.nimbusTopoHistoryStateImpl(conf);
+    }
+
+    public LocalState nimbusTopoHistoryStateImpl(Map conf) throws IOException {
+        return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"));
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static Map<String, Object> readSupervisorStormConf(Map<String, Object> conf, String stormId) throws IOException {
+        return _instance.readSupervisorStormConfImpl(conf, stormId);
+    }
+
+    public Map<String, Object> readSupervisorStormConfImpl(Map<String, Object> conf, String stormId) throws IOException {
+        String stormRoot = supervisorStormDistRoot(conf, stormId);
+        String confPath = supervisorStormConfPath(stormRoot);
+        return readSupervisorStormConfGivenPath(conf, confPath);
+    }
+
+    public static StormTopology readSupervisorTopology(Map conf, String stormId, AdvancedFSOps ops) throws IOException {
+        return _instance.readSupervisorTopologyImpl(conf, stormId, ops);
+    }
+  
+    public StormTopology readSupervisorTopologyImpl(Map conf, String stormId, AdvancedFSOps ops) throws IOException {
+        String stormRoot = supervisorStormDistRoot(conf, stormId);
+        String topologyPath = supervisorStormCodePath(stormRoot);
+        return readSupervisorStormCodeGivenPath(topologyPath, ops);
+    }
+
+    public static String workerUserRoot(Map conf) {
+        return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers-users");
+    }
+
+    public static String workerUserFile(Map conf, String workerId) {
+        return (workerUserRoot(conf) + FILE_SEPARATOR + workerId);
+    }
+
+    public static String getIdFromBlobKey(String key) {
+        if (key == null) return null;
+        final String STORM_JAR_SUFFIX = "-stormjar.jar";
+        final String STORM_CODE_SUFFIX = "-stormcode.ser";
+        final String STORM_CONF_SUFFIX = "-stormconf.ser";
+
+        String ret = null;
+        if (key.endsWith(STORM_JAR_SUFFIX)) {
+            ret = key.substring(0, key.length() - STORM_JAR_SUFFIX.length());
+        } else if (key.endsWith(STORM_CODE_SUFFIX)) {
+            ret = key.substring(0, key.length() - STORM_CODE_SUFFIX.length());
+        } else if (key.endsWith(STORM_CONF_SUFFIX)) {
+            ret = key.substring(0, key.length() - STORM_CONF_SUFFIX.length());
+        }
+        return ret;
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static String workerArtifactsRoot(Map conf) {
+        return _instance.workerArtifactsRootImpl(conf);
+    }
+
+    public String workerArtifactsRootImpl(Map conf) {
+        String artifactsDir = (String)conf.get(Config.STORM_WORKERS_ARTIFACTS_DIR);
+        if (artifactsDir == null) {
+            return (getLogDir() + FILE_SEPARATOR + "workers-artifacts");
+        } else {
+            if (new File(artifactsDir).isAbsolute()) {
+                return artifactsDir;
+            } else {
+                return (getLogDir() + FILE_SEPARATOR + artifactsDir);
+            }
+        }
+    }
+
+    public static String workerArtifactsRoot(Map conf, String id) {
+        return (workerArtifactsRoot(conf) + FILE_SEPARATOR + id);
+    }
+
+    public static String workerArtifactsRoot(Map conf, String id, Integer port) {
+        return (workerArtifactsRoot(conf, id) + FILE_SEPARATOR + port);
+    }
+
+    public static String workerArtifactsPidPath(Map conf, String id, Integer port) {
+        return (workerArtifactsRoot(conf, id, port) + FILE_SEPARATOR +  "worker.pid");
+    }
+
+    public static File getLogMetaDataFile(String fname) {
+        String[] subStrings = fname.split(FILE_SEPARATOR); // TODO: does this work well on windows?
+        String id = subStrings[0];
+        Integer port = Integer.parseInt(subStrings[1]);
+        return getLogMetaDataFile(Utils.readStormConfig(), id, port);
+    }
+
+    public static File getLogMetaDataFile(Map conf, String id, Integer port) {
+        String fname = workerArtifactsRoot(conf, id, port) + FILE_SEPARATOR + "worker.yaml";
+        return new File(fname);
+    }
+
+    public static File getWorkerDirFromRoot(String logRoot, String id, Integer port) {
+        return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port));
+    }
+
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
+    public static String workerRoot(Map conf) {
+        return _instance.workerRootImpl(conf);
+    }
+
+    public String workerRootImpl(Map conf) {
+        return (absoluteStormLocalDir(conf) + FILE_SEPARATOR + "workers");
+    }
+
+    public static String workerRoot(Map conf, String id) {
+        return (workerRoot(conf) + FILE_SEPARATOR + id);
+    }
+
+    public static String workerPidsRoot(Map conf, String id) {
+        return (workerRoot(conf, id) + FILE_SEPARATOR + "pids");
+    }
+
+    public static String workerTmpRoot(Map conf, String id) {
+        return (workerRoot(conf, id) + FILE_SEPARATOR + "tmp");
+    }
+
+
+    public static String workerPidPath(Map conf, String id, String pid) {
+        return (workerPidsRoot(conf, id) + FILE_SEPARATOR + pid);
+    }
+    
+    public static String workerPidPath(Map<String, Object> conf, String id, long pid) {
+        return workerPidPath(conf, id, String.valueOf(pid));
+    }
+
+    public static String workerHeartbeatsRoot(Map conf, String id) {
+        return (workerRoot(conf, id) + FILE_SEPARATOR + "heartbeats");
+    }
+
+    public static LocalState workerState(Map conf, String id) throws IOException {
+        return new LocalState(workerHeartbeatsRoot(conf, id));
+    }
+
+    public static Map overrideLoginConfigWithSystemProperty(Map conf) { // note that we delete the return value
+        String loginConfFile = System.getProperty("java.security.auth.login.config");
+        if (loginConfFile != null) {
+             conf.put("java.security.auth.login.config", loginConfFile);
+        }
+        return conf;
+    }
+
+    /* TODO: make sure test these two functions in manual tests */
+    public static List<String> getTopoLogsUsers(Map topologyConf) {
+        List<String> logsUsers = (List<String>)topologyConf.get(Config.LOGS_USERS);
+        List<String> topologyUsers = (List<String>)topologyConf.get(Config.TOPOLOGY_USERS);
+        Set<String> mergedUsers = new HashSet<String>();
+        if (logsUsers != null) {
+            for (String user : logsUsers) {
+                if (user != null) {
+                    mergedUsers.add(user);
+                }
+            }
+        }
+        if (topologyUsers != null) {
+            for (String user : topologyUsers) {
+                if (user != null) {
+                    mergedUsers.add(user);
+                }
+            }
+        }
+        List<String> ret = new ArrayList<String>(mergedUsers);
+        Collections.sort(ret);
+        return ret;
+    }
+
+    public static List<String> getTopoLogsGroups(Map topologyConf) {
+        List<String> logsGroups = (List<String>)topologyConf.get(Config.LOGS_GROUPS);
+        List<String> topologyGroups = (List<String>)topologyConf.get(Config.TOPOLOGY_GROUPS);
+        Set<String> mergedGroups = new HashSet<String>();
+        if (logsGroups != null) {
+            for (String group : logsGroups) {
+                if (group != null) {
+                    mergedGroups.add(group);
+                }
+            }
+        }
+        if (topologyGroups != null) {
+            for (String group : topologyGroups) {
+                if (group != null) {
+                    mergedGroups.add(group);
+                }
+            }
+        }
+        List<String> ret = new ArrayList<String>(mergedGroups);
+        Collections.sort(ret);
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java b/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
index 51250f4..8583e0d 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/InprocMessaging.java
@@ -19,41 +19,82 @@ package org.apache.storm.utils;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InprocMessaging {
-    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<>();
-    private static final Object _lock = new Object();
+    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>();
+    private static ConcurrentMap<Integer, AtomicBoolean> _hasReader = new ConcurrentHashMap<>();
     private static int port = 1;
+    private static final Logger LOG = LoggerFactory.getLogger(InprocMessaging.class);
     
-    public static int acquireNewPort() {
-        int ret;
-        synchronized(_lock) {
-            ret = port;
-            port++;
-        }
+    public synchronized static int acquireNewPort() {
+        int ret = port;
+        port++;
         return ret;
     }
     
     public static void sendMessage(int port, Object msg) {
+        waitForReader(port);
+        getQueue(port).add(msg);
+    }
+    
+    public static void sendMessageNoWait(int port, Object msg) {
         getQueue(port).add(msg);
     }
     
     public static Object takeMessage(int port) throws InterruptedException {
+        readerArrived(port);
         return getQueue(port).take();
     }
 
     public static Object pollMessage(int port) {
+        readerArrived(port);
         return  getQueue(port).poll();
-    }    
+    }
+    
+    private static AtomicBoolean getHasReader(int port) {
+        AtomicBoolean ab = _hasReader.get(port);
+        if (ab == null) {
+            _hasReader.putIfAbsent(port, new AtomicBoolean(false));
+            ab = _hasReader.get(port);
+        }
+        return ab;
+    }
     
-    private static LinkedBlockingQueue<Object> getQueue(int port) {
-        synchronized(_lock) {
-            if(!_queues.containsKey(port)) {
-              _queues.put(port, new LinkedBlockingQueue<>());
+    public static void waitForReader(int port) {
+        AtomicBoolean ab = getHasReader(port);
+        long start = Time.currentTimeMillis();
+        while (!ab.get()) {
+            if (Time.isSimulating()) {
+                Time.advanceTime(100);
+            }
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                //Ignored
             }
-            return _queues.get(port);
+            if (Time.currentTimeMillis() - start > 20000) {
+                LOG.error("DONE WAITING FOR READER AFTER {} ms", Time.currentTimeMillis() - start);
+                break;
+            }
+        }
+    }
+    
+    private static void readerArrived(int port) {
+        getHasReader(port).set(true);
+    }
+    
+    private synchronized static LinkedBlockingQueue<Object> getQueue(int port) {
+        if(!_queues.containsKey(port)) {
+            _queues.put(port, new LinkedBlockingQueue<Object>());
         }
+        return _queues.get(port);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
index aef1c1c..2f0bb60 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
@@ -18,24 +18,28 @@
 package org.apache.storm.utils;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.storm.generated.LSApprovedWorkers;
+import org.apache.storm.generated.LSSupervisorAssignments;
+import org.apache.storm.generated.LSSupervisorId;
+import org.apache.storm.generated.LSTopoHistory;
+import org.apache.storm.generated.LSTopoHistoryList;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.LocalStateData;
+import org.apache.storm.generated.ThriftSerializedObject;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-
-import org.apache.storm.generated.LocalStateData;
-import org.apache.storm.generated.ThriftSerializedObject;
+import java.util.List;
+import java.util.Map;
 
 /**
  * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes.
@@ -43,6 +47,11 @@ import org.apache.storm.generated.ThriftSerializedObject;
  */
 public class LocalState {
     public static final Logger LOG = LoggerFactory.getLogger(LocalState.class);
+    public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat";
+    public static final String LS_ID = "supervisor-id";
+    public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments";
+    public static final String LS_APPROVED_WORKERS = "approved-workers";
+    public static final String LS_TOPO_HISTORY = "topo-hist";
     private VersionedStore _vs;
     
     public LocalState(String backingDir) throws IOException {
@@ -157,6 +166,85 @@ public class LocalState {
         _vs.cleanup(keepVersions);
     }
 
+    public List<LSTopoHistory> getTopoHistoryList() {
+        LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+        if (null != lsTopoHistoryListWrapper) {
+            return lsTopoHistoryListWrapper.get_topo_history();
+        }
+        return null;
+    }
+
+    /**
+     * Remove topologies from local state which are older than cutOffAge.
+     * @param cutOffAge
+     */
+    public void filterOldTopologies(long cutOffAge) {
+        LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+        List<LSTopoHistory> filteredTopoHistoryList = new ArrayList<>();
+        if (null != lsTopoHistoryListWrapper) {
+            for (LSTopoHistory topoHistory : lsTopoHistoryListWrapper.get_topo_history()) {
+                if (topoHistory.get_time_stamp() > cutOffAge) {
+                    filteredTopoHistoryList.add(topoHistory);
+                }
+            }
+        }
+        put(LS_TOPO_HISTORY, new LSTopoHistoryList(filteredTopoHistoryList));
+    }
+
+    public void addTopologyHistory(LSTopoHistory lsTopoHistory) {
+        LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+        List<LSTopoHistory> currentTopoHistoryList = new ArrayList<>();
+        if (null != lsTopoHistoryListWrapper) {
+            currentTopoHistoryList.addAll(lsTopoHistoryListWrapper.get_topo_history());
+        }
+        currentTopoHistoryList.add(lsTopoHistory);
+        put(LS_TOPO_HISTORY, new LSTopoHistoryList(currentTopoHistoryList));
+    }
+
+    public String getSupervisorId() {
+        LSSupervisorId lsSupervisorId = (LSSupervisorId) get(LS_ID);
+        if (null != lsSupervisorId) {
+            return lsSupervisorId.get_supervisor_id();
+        }
+        return null;
+    }
+
+    public void setSupervisorId(String supervisorId) {
+        put(LS_ID, new LSSupervisorId(supervisorId));
+    }
+
+    public Map<String, Integer> getApprovedWorkers() {
+        LSApprovedWorkers lsApprovedWorkers = (LSApprovedWorkers) get(LS_APPROVED_WORKERS);
+        if (null != lsApprovedWorkers) {
+            return lsApprovedWorkers.get_approved_workers();
+        }
+        return null;
+    }
+
+    public void setApprovedWorkers(Map<String, Integer> approvedWorkers) {
+        put(LS_APPROVED_WORKERS, new LSApprovedWorkers(approvedWorkers));
+    }
+
+    public LSWorkerHeartbeat getWorkerHeartBeat() {
+        return (LSWorkerHeartbeat) get(LS_WORKER_HEARTBEAT);
+    }
+
+    public void setWorkerHeartBeat(LSWorkerHeartbeat workerHeartBeat) {
+        put(LS_WORKER_HEARTBEAT, workerHeartBeat, false);
+    }
+
+    public Map<Integer, LocalAssignment> getLocalAssignmentsMap() {
+        LSSupervisorAssignments assignments = (LSSupervisorAssignments) get(LS_LOCAL_ASSIGNMENTS);
+        if (null != assignments) {
+            return assignments.get_assignments();
+        }
+        return null;
+    }
+
+    public void setLocalAssignmentsMap(Map<Integer, LocalAssignment> localAssignmentMap) {
+        put(LS_LOCAL_ASSIGNMENTS, new LSSupervisorAssignments(localAssignmentMap));
+    }
+
     private void persistInternal(Map<String, ThriftSerializedObject> serialized, TSerializer ser, boolean cleanup) {
         try {
             if (ser == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
index 44ae70e..e8cef09 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -17,11 +17,11 @@
  */
 package org.apache.storm.utils;
 
-
 import org.apache.storm.Config;
 import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.ThriftClient;
 import org.apache.storm.security.auth.ThriftConnectionType;
 import com.google.common.collect.Lists;
@@ -29,6 +29,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.security.Principal;
 import java.util.List;
 import java.util.Map;
 
@@ -36,6 +37,22 @@ public class NimbusClient extends ThriftClient {
     private Nimbus.Client _client;
     private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
 
+    public interface WithNimbus {
+        public void run(Nimbus.Client client) throws Exception;
+    }
+
+    public static void withConfiguredClient(WithNimbus cb) throws Exception {
+        withConfiguredClient(cb, ConfigUtils.readStormConfig());
+    }
+
+    public static void withConfiguredClient(WithNimbus cb, Map conf) throws Exception {
+        ReqContext context = ReqContext.context();
+        Principal principal = context.principal();
+        String user = principal == null ? null : principal.getName();
+        try (NimbusClient client = getConfiguredClientAs(conf, user);) {
+            cb.run(client.getClient());
+        }
+    }
 
     public static NimbusClient getConfiguredClient(Map conf) {
         return getConfiguredClientAs(conf, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index 1792252..e501b6c 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -29,6 +29,7 @@ public class Time {
     public static final Logger LOG = LoggerFactory.getLogger(Time.class);
     
     private static AtomicBoolean simulating = new AtomicBoolean(false);
+    private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
     //TODO: should probably use weak references here or something
     private static volatile Map<Thread, AtomicLong> threadSleepTimes;
     private static final Object sleepTimesLock = new Object();
@@ -43,10 +44,18 @@ public class Time {
         }
     }
     
+    public static void startSimulatingAutoAdvanceOnSleep(long ms) {
+        synchronized(sleepTimesLock) {
+            startSimulating();
+            autoAdvanceOnSleep.set(ms);
+        }
+    }
+    
     public static void stopSimulating() {
         synchronized(sleepTimesLock) {
-            simulating.set(false);             
-            threadSleepTimes = null;  
+            simulating.set(false);    
+            autoAdvanceOnSleep.set(0);
+            threadSleepTimes = null;
         }
     }
     
@@ -71,6 +80,10 @@ public class Time {
                             throw new InterruptedException();
                         }
                     }
+                    long autoAdvance = autoAdvanceOnSleep.get();
+                    if (autoAdvance > 0) {
+                        advanceTime(autoAdvance);
+                    }
                     Thread.sleep(10);
                 }
             } finally {
@@ -86,10 +99,16 @@ public class Time {
                 Thread.sleep(sleepTime);
         }
     }
-    
+
     public static void sleep(long ms) throws InterruptedException {
         sleepUntil(currentTimeMillis()+ms);
     }
+
+    public static void sleepSecs (long secs) throws InterruptedException {
+        if (secs > 0) {
+            sleep(secs * 1000);
+        }
+    }
     
     public static long currentTimeMillis() {
         if(simulating.get()) {
@@ -98,14 +117,32 @@ public class Time {
             return System.currentTimeMillis();
         }
     }
-    
+
+    public static long secsToMillis (int secs) {
+        return 1000*(long) secs;
+    }
+
+    public static long secsToMillisLong(double secs) {
+        return (long) (1000 * secs);
+    }
+
     public static int currentTimeSecs() {
         return (int) (currentTimeMillis() / 1000);
     }
+
+    public static int deltaSecs(int timeInSeconds) {
+        return Time.currentTimeSecs() - timeInSeconds;
+    }
+
+    public static long deltaMs(long timeInMilliseconds) {
+        return Time.currentTimeMillis() - timeInMilliseconds;
+    }
     
     public static void advanceTime(long ms) {
-        if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
-        simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
+        if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+        if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
+        long newTime = simulatedCurrTimeMs.addAndGet(ms);
+        LOG.warn("Advanced simulated time to {}", newTime);
     }
     
     public static boolean isThreadWaiting(Thread t) {
@@ -115,5 +152,5 @@ public class Time {
             time = threadSleepTimes.get(t);
         }
         return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
-    }    
+    }
 }


Mime
View raw message