storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [05/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:39 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java
new file mode 100644
index 0000000..c92db0c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java
@@ -0,0 +1,785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+    
+    static enum MachineState {
+        EMPTY,
+        RUNNING,
+        WAITING_FOR_WORKER_START,
+        KILL_AND_RELAUNCH,
+        KILL,
+        WAITING_FOR_BASIC_LOCALIZATION,
+        WAITING_FOR_BLOB_LOCALIZATION;
+    };
+    
+    static class StaticState {
+        public final ILocalizer localizer;
+        public final long hbTimeoutMs;
+        public final long firstHbTimeoutMs;
+        public final long killSleepMs;
+        public final long monitorFreqMs;
+        public final ContainerLauncher containerLauncher;
+        public final int port;
+        public final String host;
+        public final ISupervisor iSupervisor;
+        public final LocalState localState;
+        
+        StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
+                long killSleepMs, long monitorFreqMs,
+                ContainerLauncher containerLauncher, String host, int port,
+                ISupervisor iSupervisor, LocalState localState) {
+            this.localizer = localizer;
+            this.hbTimeoutMs = hbTimeoutMs;
+            this.firstHbTimeoutMs = firstHbTimeoutMs;
+            this.containerLauncher = containerLauncher;
+            this.killSleepMs = killSleepMs;
+            this.monitorFreqMs = monitorFreqMs;
+            this.host = host;
+            this.port = port;
+            this.iSupervisor = iSupervisor;
+            this.localState = localState;
+        }
+    }
+    
+    static class DynamicState {
+        public final MachineState state;
+        public final LocalAssignment newAssignment;
+        public final LocalAssignment currentAssignment;
+        public final Container container;
+        public final LocalAssignment pendingLocalization;
+        public final Future<Void> pendingDownload;
+        public final Set<TopoProfileAction> profileActions;
+        public final Set<TopoProfileAction> pendingStopProfileActions;
+        
+        /**
+         * The last time that WAITING_FOR_WORKER_START, KILL, or KILL_AND_RELAUNCH were entered into.
+         */
+        public final long startTime;
+        
+        public DynamicState(final LocalAssignment currentAssignment, Container container, final LocalAssignment newAssignment) {
+            this.currentAssignment = currentAssignment;
+            this.container = container;
+            if ((currentAssignment == null) ^ (container == null)) {
+                throw new IllegalArgumentException("Container and current assignment must both be null, or neither can be null");
+            }
+            
+            if (currentAssignment == null) {
+                state = MachineState.EMPTY;
+            } else {
+                state = MachineState.RUNNING;
+            }
+            
+            this.startTime = System.currentTimeMillis();
+            this.newAssignment = newAssignment;
+            this.pendingLocalization = null;
+            this.pendingDownload = null;
+            this.profileActions = new HashSet<>();
+            this.pendingStopProfileActions = new HashSet<>();
+        }
+        
+        public DynamicState(final MachineState state, final LocalAssignment newAssignment,
+                final Container container, final LocalAssignment currentAssignment,
+                final LocalAssignment pendingLocalization, final long startTime,
+                final Future<Void> pendingDownload, final Set<TopoProfileAction> profileActions, 
+                final Set<TopoProfileAction> pendingStopProfileActions) {
+            this.state = state;
+            this.newAssignment = newAssignment;
+            this.currentAssignment = currentAssignment;
+            this.container = container;
+            this.pendingLocalization = pendingLocalization;
+            this.startTime = startTime;
+            this.pendingDownload = pendingDownload;
+            this.profileActions = profileActions;
+            this.pendingStopProfileActions = pendingStopProfileActions;
+        }
+        
+        public String toString() {
+            StringBuffer sb = new StringBuffer();
+            sb.append(state);
+            sb.append(" msInState: ");
+            sb.append(Time.currentTimeMillis() - startTime);
+            if (container != null) {
+                sb.append(" ");
+                sb.append(container);
+            }
+            return sb.toString();
+        }
+
+        /**
+         * Set the new assignment for the state.  This should never be called from within the state machine.
+         * It is an input from outside.
+         * @param newAssignment the new assignment to set
+         * @return the updated DynamicState.
+         */
+        public DynamicState withNewAssignment(LocalAssignment newAssignment) {
+            return new DynamicState(this.state, newAssignment,
+                    this.container, this.currentAssignment,
+                    this.pendingLocalization, this.startTime,
+                    this.pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+        
+        public DynamicState withPendingLocalization(LocalAssignment pendingLocalization, Future<Void> pendingDownload) {
+            return new DynamicState(this.state, this.newAssignment,
+                    this.container, this.currentAssignment,
+                    pendingLocalization, this.startTime,
+                    pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+        
+        public DynamicState withPendingLocalization(Future<Void> pendingDownload) {
+            return withPendingLocalization(this.pendingLocalization, pendingDownload);
+        }
+        
+        public DynamicState withState(final MachineState state) {
+            long newStartTime = Time.currentTimeMillis();
+            return new DynamicState(state, this.newAssignment,
+                    this.container, this.currentAssignment,
+                    this.pendingLocalization, newStartTime,
+                    this.pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+
+        public DynamicState withCurrentAssignment(final Container container, final LocalAssignment currentAssignment) {
+            return new DynamicState(this.state, this.newAssignment,
+                    container, currentAssignment,
+                    this.pendingLocalization, this.startTime,
+                    this.pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+
+        public DynamicState withProfileActions(Set<TopoProfileAction> profileActions, Set<TopoProfileAction> pendingStopProfileActions) {
+            return new DynamicState(this.state, this.newAssignment,
+                    this.container, this.currentAssignment,
+                    this.pendingLocalization, this.startTime,
+                    this.pendingDownload, profileActions,
+                    pendingStopProfileActions);
+        }
+    };
+    
+    static class TopoProfileAction {
+        public final String topoId;
+        public final ProfileRequest request;
+        
+        public TopoProfileAction(String topoId, ProfileRequest request) {
+            this.topoId = topoId;
+            this.request = request;
+        }
+        
+        @Override
+        public int hashCode() {
+            return (37 * topoId.hashCode()) + request.hashCode(); 
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof TopoProfileAction)) {
+                return false;
+            }
+            TopoProfileAction o = (TopoProfileAction) other;
+            return topoId.equals(o.topoId) && request.equals(o.request);
+        }
+        
+        @Override
+        public String toString() {
+            return "{ " + topoId + ": " + request + " }";
+        }
+    }
+    
+    static boolean equivalent(LocalAssignment a, LocalAssignment b) {
+        if (a == null && b == null) {
+            return true;
+        }
+        if (a != null && b != null) {
+            if (a.get_topology_id().equals(b.get_topology_id())) {
+                Set<ExecutorInfo> aexec = new HashSet<>(a.get_executors());
+                Set<ExecutorInfo> bexec = new HashSet<>(b.get_executors());
+                if (aexec.equals(bexec)) {
+                    boolean aHasResources = a.is_set_resources();
+                    boolean bHasResources = b.is_set_resources();
+                    if (!aHasResources && !bHasResources) {
+                        return true;
+                    }
+                    if (aHasResources && bHasResources) {
+                        if (a.get_resources().equals(b.get_resources())) {
+                            return true;
+                        }
+                    }
+                }
+            }
+        }
+        return false;
+    }
+    
+    static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception {
+        LOG.debug("STATE {}", dynamicState.state);
+        switch (dynamicState.state) {
+            case EMPTY:
+                return handleEmpty(dynamicState, staticState);
+            case RUNNING:
+                return handleRunning(dynamicState, staticState);
+            case WAITING_FOR_WORKER_START:
+                return handleWaitingForWorkerStart(dynamicState, staticState);
+            case KILL_AND_RELAUNCH:
+                return handleKillAndRelaunch(dynamicState, staticState);
+            case KILL:
+                return handleKill(dynamicState, staticState);
+            case WAITING_FOR_BASIC_LOCALIZATION:
+                return handleWaitingForBasicLocalization(dynamicState, staticState);
+            case WAITING_FOR_BLOB_LOCALIZATION:
+                return handleWaitingForBlobLocalization(dynamicState, staticState);
+            default:
+                throw new IllegalStateException("Code not ready to handle a state of "+dynamicState.state);
+        }
+    }
+    
+    /**
+     * Prepare for a new assignment by downloading new required blobs, or going to empty if there is nothing to download.
+     * PRECONDITION: The slot should be empty
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws IOException on any error
+     */
+    static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
+        assert(dynamicState.container == null);
+        
+        if (dynamicState.newAssignment == null) {
+            return dynamicState.withState(MachineState.EMPTY);
+        }
+        Future<Void> pendingDownload = staticState.localizer.requestDownloadBaseTopologyBlobs(dynamicState.newAssignment, staticState.port);
+        return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload).withState(MachineState.WAITING_FOR_BASIC_LOCALIZATION);
+    }
+    
+    /**
+     * Kill the current container and start downloading what the new assignment needs, if there is a new assignment
+     * PRECONDITION: container != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception 
+     */
+    static DynamicState killContainerForChangedAssignment(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        
+        staticState.iSupervisor.killedWorker(staticState.port);
+        dynamicState.container.kill();
+        Future<Void> pendingDownload = null;
+        if (dynamicState.newAssignment != null) {
+            pendingDownload = staticState.localizer.requestDownloadBaseTopologyBlobs(dynamicState.newAssignment, staticState.port);
+        }
+        Time.sleep(staticState.killSleepMs);
+        return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload).withState(MachineState.KILL);
+    }
+    
+    /**
+     * Kill the current container and relaunch it.  (Something odd happened)
+     * PRECONDITION: container != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception 
+     */
+    static DynamicState killAndRelaunchContainer(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        
+        dynamicState.container.kill();
+        Time.sleep(staticState.killSleepMs);
+        
+        //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again.
+        HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
+        mod.addAll(dynamicState.pendingStopProfileActions);
+        return dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.<TopoProfileAction> emptySet());
+    }
+    
+    /**
+     * Clean up a container
+     * PRECONDITION: All of the processes have died.
+     * @param dynamicState current state
+     * @param staticState static data
+     * @param nextState the next MachineState to go to.
+     * @return the next state.
+     */
+    static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        assert(dynamicState.container.areAllProcessesDead());
+        
+        dynamicState.container.cleanUp();
+        staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port);
+        DynamicState ret = dynamicState.withCurrentAssignment(null, null);
+        if (nextState != null) {
+            ret = ret.withState(nextState);
+        }
+        return ret;
+    }
+    
+    /**
+     * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.
+     * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
+     * PRECONDITION: The slot should be empty
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.pendingLocalization != null);
+        assert(dynamicState.pendingDownload != null);
+        assert(dynamicState.container == null);
+        
+        //Ignore changes to scheduling while downloading the topology blobs
+        // We don't support canceling the download through the future yet,
+        // so to keep everything in sync, just wait
+        try {
+            dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
+            //Downloading of all blobs finished.
+            if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {
+                //Scheduling changed
+                staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
+                return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
+            }
+            Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);
+            return dynamicState.withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START).withPendingLocalization(null, null);
+        } catch (TimeoutException e) {
+            //We waited for 1 second loop around and try again....
+            return dynamicState;
+        }
+    }
+    
+    /**
+     * State Transitions for WAITING_FOR_BASIC_LOCALIZATION state.
+     * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
+     * PRECONDITION: The slot should be empty
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleWaitingForBasicLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.pendingLocalization != null);
+        assert(dynamicState.pendingDownload != null);
+        assert(dynamicState.container == null);
+        
+        //Ignore changes to scheduling while downloading the topology code
+        // We don't support canceling the download through the future yet,
+        // so to keep everything in sync, just wait
+        try {
+            dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
+            Future<Void> pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.pendingLocalization, staticState.port);
+            return dynamicState.withPendingLocalization(pendingDownload).withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION);
+        } catch (TimeoutException e) {
+            return dynamicState;
+        }
+    }
+
+    /**
+     * State Transitions for KILL state.
+     * PRECONDITION: container.kill() was called
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        if (dynamicState.container.areAllProcessesDead()) {
+            LOG.warn("SLOT {} all processes are dead...", staticState.port);
+            return cleanupCurrentContainer(dynamicState, staticState, 
+                    dynamicState.pendingLocalization == null ? MachineState.EMPTY : MachineState.WAITING_FOR_BASIC_LOCALIZATION);
+        }
+
+        LOG.warn("SLOT {} force kill and wait...", staticState.port);
+        dynamicState.container.forceKill();
+        Time.sleep(staticState.killSleepMs);
+        return dynamicState;
+    }
+
+    /**
+     * State Transitions for KILL_AND_RELAUNCH state.
+     * PRECONDITION: container.kill() was called
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        if (dynamicState.container.areAllProcessesDead()) {
+            if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
+                dynamicState.container.cleanUpForRestart();
+                dynamicState.container.relaunch();
+                return dynamicState.withState(MachineState.WAITING_FOR_WORKER_START);
+            }
+            //Scheduling changed after we killed all of the processes
+            return prepareForNewAssignmentNoWorkersRunning(cleanupCurrentContainer(dynamicState, staticState, null), staticState);
+        }
+        //The child processes typically exit in < 1 sec.  If 2 mins later they are still around something is wrong
+        if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) {
+            throw new RuntimeException("Not all processes in " + dynamicState.container + " exited after 120 seconds");
+        }
+        dynamicState.container.forceKill();
+        Time.sleep(staticState.killSleepMs);
+        return dynamicState;
+    }
+
+    /**
+     * State Transitions for WAITING_FOR_WORKER_START state.
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
+        if (hb != null) {
+            long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
+            if (hbAgeMs <= staticState.hbTimeoutMs) {
+                return dynamicState.withState(MachineState.RUNNING);
+            }
+        }
+        
+        if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
+            //We were rescheduled while waiting for the worker to come up
+            LOG.warn("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment, dynamicState.newAssignment);
+            return killContainerForChangedAssignment(dynamicState, staticState);
+        }
+        
+        long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
+        if (timeDiffms > staticState.firstHbTimeoutMs) {
+            LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container, staticState.firstHbTimeoutMs);
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        Time.sleep(1000);
+        return dynamicState;
+    }
+
+    /**
+     * State Transitions for RUNNING state.
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
+            LOG.warn("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment, dynamicState.newAssignment);
+            //Scheduling changed while running...
+            return killContainerForChangedAssignment(dynamicState, staticState);
+        }
+        if (dynamicState.container.didMainProcessExit()) {
+            LOG.warn("SLOT {}: main process has exited", staticState.port);
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        
+        LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
+        if (hb == null) {
+            LOG.warn("SLOT {}: HB returned as null", staticState.port);
+            //This can happen if the supervisor crashed after launching a
+            // worker that never came up.
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        
+        long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
+        if (timeDiffMs > staticState.hbTimeoutMs) {
+            LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs);
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        
+        //The worker is up and running check for profiling requests
+        if (!dynamicState.profileActions.isEmpty()) {
+            HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
+            HashSet<TopoProfileAction> modPending = new HashSet<>(dynamicState.pendingStopProfileActions);
+            Iterator<TopoProfileAction> iter = mod.iterator();
+            while (iter.hasNext()) {
+                TopoProfileAction action = iter.next();
+                if (!action.topoId.equals(dynamicState.currentAssignment.get_topology_id())) {
+                    iter.remove();
+                    LOG.warn("Dropping {} wrong topology is running", action);
+                    //Not for this topology so skip it
+                } else {
+                    if (modPending.contains(action)) {
+                        boolean isTimeForStop = Time.currentTimeMillis() > action.request.get_time_stamp();
+                        if (isTimeForStop) {
+                            if (dynamicState.container.runProfiling(action.request, true)) {
+                                LOG.debug("Stopped {} action finished", action);
+                                iter.remove();
+                                modPending.remove(action);
+                            } else {
+                                LOG.warn("Stopping {} failed, will be retried", action);
+                            }
+                        } else {
+                            LOG.debug("Still pending {} now: {}", action, Time.currentTimeMillis());
+                        }
+                    } else {
+                        //J_PROFILE_START is not used.  When you see a J_PROFILE_STOP
+                        // start profiling and save it away to stop when timeout happens
+                        if (action.request.get_action() == ProfileAction.JPROFILE_STOP) {
+                            if (dynamicState.container.runProfiling(action.request, false)) {
+                                modPending.add(action);
+                                LOG.debug("Started {} now: {}", action, Time.currentTimeMillis());
+                            } else {
+                                LOG.warn("Starting {} failed, will be retried", action);
+                            }
+                        } else {
+                            if (dynamicState.container.runProfiling(action.request, false)) {
+                                LOG.debug("Started {} action finished", action);
+                                iter.remove();
+                            } else {
+                                LOG.warn("Starting {} failed, will be retried", action);
+                            }
+                        }
+                    }
+                }
+            }
+            dynamicState = dynamicState.withProfileActions(mod, modPending);
+        }
+        Time.sleep(staticState.monitorFreqMs);
+        return dynamicState;
+    }
+
+    static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException {
+        if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
+            return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
+        }
+        //Both assignments are null, just wait
+        if (dynamicState.profileActions != null && !dynamicState.profileActions.isEmpty()) {
+            //Nothing is scheduled here so throw away all of the profileActions
+            LOG.warn("Dropping {} no topology is running", dynamicState.profileActions);
+            dynamicState = dynamicState.withProfileActions(Collections.<TopoProfileAction> emptySet(), Collections.<TopoProfileAction> emptySet());
+        }
+        Time.sleep(1000);
+        return dynamicState;
+    }
+    
+    private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>();
+    private final AtomicReference<Set<TopoProfileAction>> profiling =
+            new AtomicReference<Set<TopoProfileAction>>(new HashSet<TopoProfileAction>());
+    private final StaticState staticState;
+    private final IStormClusterState clusterState;
+    private volatile boolean done = false;
+    private volatile DynamicState dynamicState;
+    private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
+    
+    public Slot(ILocalizer localizer, Map<String, Object> conf, 
+            ContainerLauncher containerLauncher, String host,
+            int port, LocalState localState,
+            IStormClusterState clusterState,
+            ISupervisor iSupervisor,
+            AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+        super("SLOT_"+port);
+
+        this.cachedCurrentAssignments = cachedCurrentAssignments;
+        this.clusterState = clusterState;
+        Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap();
+        LocalAssignment currentAssignment = null;
+        if (assignments != null) {
+            currentAssignment = assignments.get(port);
+        }
+        Container container = null;
+        if (currentAssignment != null) { 
+            try {
+                container = containerLauncher.recoverContainer(port, currentAssignment, localState);
+            } catch (ContainerRecoveryException e) {
+                //We could not recover container will be null.
+            }
+        }
+        
+        LocalAssignment newAssignment = currentAssignment;
+        if (currentAssignment != null && container == null) {
+            currentAssignment = null;
+            //Assigned something but it is not running
+        }
+        
+        dynamicState = new DynamicState(currentAssignment, container, newAssignment);
+        staticState = new StaticState(localizer, 
+                Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
+                Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
+                Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
+                Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
+                containerLauncher,
+                host,
+                port,
+                iSupervisor,
+                localState);
+        this.newAssignment.set(dynamicState.newAssignment);
+        if (MachineState.RUNNING == dynamicState.state) {
+            //We are running so we should recover the blobs.
+            staticState.localizer.recoverRunningTopology(currentAssignment, port);
+            saveNewAssignment(currentAssignment);
+        }
+        LOG.warn("SLOT {}:{} Starting in state {} - assignment {}", staticState.host, staticState.port, dynamicState.state, dynamicState.currentAssignment);
+    }
+    
+    public MachineState getMachineState() {
+        return dynamicState.state;
+    }
+    
+    /**
+     * Set a new assignment asynchronously
+     * @param newAssignment the new assignment for this slot to run, null to run nothing
+     */
+    public void setNewAssignment(LocalAssignment newAssignment) {
+        this.newAssignment.set(newAssignment);
+    }
+    
+    public void addProfilerActions(Set<TopoProfileAction> actions) {
+        if (actions != null) {
+            while(true) {
+                Set<TopoProfileAction> orig = profiling.get();
+                Set<TopoProfileAction> newActions = new HashSet<>(orig);
+                newActions.addAll(actions);
+                if (profiling.compareAndSet(orig, newActions)) {
+                    return;
+                }
+            }
+        }
+    }
+    
+    public String getWorkerId() {
+        String workerId = null;
+        Container c = dynamicState.container;
+        if (c != null) {
+            workerId = c.getWorkerId();
+        }
+        return workerId;
+    }
+    
+    private void saveNewAssignment(LocalAssignment assignment) {
+        synchronized(staticState.localState) {
+            Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap();
+            if (assignments == null) {
+                assignments = new HashMap<>();
+            }
+            if (assignment == null) {
+                assignments.remove(staticState.port);
+            } else {
+                assignments.put(staticState.port, assignment);
+            }
+            staticState.localState.setLocalAssignmentsMap(assignments);
+        }
+        Map<Long, LocalAssignment> update = null;
+        Map<Long, LocalAssignment> orig = null;
+        do {
+            Long lport = new Long(staticState.port);
+            orig = cachedCurrentAssignments.get();
+            update = new HashMap<>(orig);
+            if (assignment == null) {
+                update.remove(lport);
+            } else {
+                update.put(lport, assignment);
+            }
+        } while (!cachedCurrentAssignments.compareAndSet(orig, update));
+    }
+    
+    public void run() {
+        try {
+            while(!done) {
+                Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());
+                Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);
+                
+                DynamicState nextState = 
+                        stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
+                                .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions), staticState);
+
+                if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {
+                    LOG.info("STATE {} -> {}", dynamicState, nextState);
+                }
+                //Save the current state for recovery
+                if (!equivalent(nextState.currentAssignment, dynamicState.currentAssignment)) {
+                    LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, nextState.currentAssignment);
+                    saveNewAssignment(nextState.currentAssignment);
+                }
+                
+                // clean up the profiler actions that are not being processed
+                removed.removeAll(dynamicState.profileActions);
+                removed.removeAll(dynamicState.pendingStopProfileActions);
+                for (TopoProfileAction action: removed) {
+                    try {
+                        clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
+                    } catch (Exception e) {
+                        LOG.error("Error trying to remove profiling request, it will be retried", e);
+                    }
+                }
+                Set<TopoProfileAction> orig, copy;
+                do {
+                    orig = profiling.get();
+                    copy = new HashSet<>(orig);
+                    copy.removeAll(removed);
+                } while (!profiling.compareAndSet(orig, copy));
+                dynamicState = nextState;
+            }
+        } catch (Throwable e) {
+            if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
+                LOG.error("Error when processing event", e);
+                Utils.exitProcess(20, "Error when processing an event");
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        done = true;
+        this.interrupt();
+        this.join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
new file mode 100644
index 0000000..c14b060
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class StandaloneSupervisor implements ISupervisor {
+    private String supervisorId;
+    private Map conf;
+
+    @Override
+    public void prepare(Map stormConf, String schedulerLocalDir) {
+        try {
+            LocalState localState = new LocalState(schedulerLocalDir);
+            String supervisorId = localState.getSupervisorId();
+            if (supervisorId == null) {
+                supervisorId = generateSupervisorId();
+                localState.setSupervisorId(supervisorId);
+            }
+            this.conf = stormConf;
+            this.supervisorId = supervisorId;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getSupervisorId() {
+        return supervisorId;
+    }
+
+    @Override
+    public String getAssignmentId() {
+        return supervisorId;
+    }
+
+    @Override
+    public Object getMetadata() {
+        Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS);
+        return ports;
+    }
+
+    @Override
+    public boolean confirmAssigned(int port) {
+        return true;
+    }
+
+    @Override
+    public void killedWorker(int port) {
+
+    }
+
+    @Override
+    public void assigned(Collection<Integer> ports) {
+
+    }
+
+    public String generateSupervisorId(){
+        return Utils.uuid();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
new file mode 100644
index 0000000..1399e8d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.AsyncLocalizer;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Supervisor implements DaemonCommon, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+    private final Map<String, Object> conf;
+    private final IContext sharedContext;
+    private volatile boolean active;
+    private final ISupervisor iSupervisor;
+    private final Utils.UptimeComputer upTime;
+    private final String stormVersion;
+    private final IStormClusterState stormClusterState;
+    private final LocalState localState;
+    private final String supervisorId;
+    private final String assignmentId;
+    private final String hostName;
+    // used for reporting used ports when heartbeating
+    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
+    private final StormTimer heartbeatTimer;
+    private final StormTimer eventTimer;
+    private final StormTimer blobUpdateTimer;
+    private final Localizer localizer;
+    private final ILocalizer asyncLocalizer;
+    private EventManager eventManager;
+    private ReadClusterState readState;
+    
+    private Supervisor(ISupervisor iSupervisor) throws IOException {
+        this(Utils.readStormConfig(), null, iSupervisor);
+    }
+    
+    public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) throws IOException {
+        this.conf = conf;
+        this.iSupervisor = iSupervisor;
+        this.active = true;
+        this.upTime = Utils.makeUptimeComputer();
+        this.stormVersion = VersionInfo.getVersion();
+        this.sharedContext = sharedContext;
+        
+        iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+        
+        List<ACL> acls = null;
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            acls = SupervisorUtils.supervisorZkAcls();
+        }
+
+        try {
+            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+        } catch (Exception e) {
+            LOG.error("supervisor can't create stormClusterState");
+            throw Utils.wrapInRuntime(e);
+        }
+
+        try {
+            this.localState = ConfigUtils.supervisorState(conf);
+            this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
+            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        this.supervisorId = iSupervisor.getSupervisorId();
+        this.assignmentId = iSupervisor.getAssignmentId();
+
+        try {
+            this.hostName = Utils.hostname(conf);
+        } catch (UnknownHostException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
+
+        this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+        this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+
+        this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
+    }
+    
+    public String getId() {
+        return supervisorId;
+    }
+    
+    IContext getSharedContext() {
+        return sharedContext;
+    }
+
+    public Map<String, Object> getConf() {
+        return conf;
+    }
+
+    public ISupervisor getiSupervisor() {
+        return iSupervisor;
+    }
+
+    public Utils.UptimeComputer getUpTime() {
+        return upTime;
+    }
+
+    public String getStormVersion() {
+        return stormVersion;
+    }
+
+    public IStormClusterState getStormClusterState() {
+        return stormClusterState;
+    }
+
+    LocalState getLocalState() {
+        return localState;
+    }
+
+    public String getAssignmentId() {
+        return assignmentId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
+        return currAssignment;
+    }
+
+    public Localizer getLocalizer() {
+        return localizer;
+    }
+    
+    ILocalizer getAsyncLocalizer() {
+        return asyncLocalizer;
+    }
+    
+    EventManager getEventManger() {
+        return eventManager;
+    }
+    
+    /**
+     * Launch the supervisor
+     */
+    public void launch() throws Exception {
+        LOG.info("Starting Supervisor with conf {}", conf);
+        String path = ConfigUtils.supervisorTmpDir(conf);
+        FileUtils.cleanDirectory(new File(path));
+
+        Localizer localizer = getLocalizer();
+
+        SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
+        hb.run();
+        // should synchronize supervisor so it doesn't launch anything after being down (optimization)
+        Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+        heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);
+
+        this.eventManager = new EventManagerImp(false);
+        this.readState = new ReadClusterState(this);
+        
+        Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
+        for (String topoId : downloadedTopoIds) {
+            SupervisorUtils.addBlobReferences(localizer, topoId, conf);
+        }
+        // do this after adding the references so we don't try to clean things being used
+        localizer.startCleaner();
+
+        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
+
+        if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+            // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
+            // to date even if callbacks don't all work exactly right
+            eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
+
+            // Blob update thread. Starts with 30 seconds delay, every 30 seconds
+            blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
+
+            // supervisor health check
+            eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
+        }
+        LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
+    }
+
+    /**
+     * start distribute supervisor
+     */
+    private void launchDaemon() {
+        LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion());
+        try {
+            Map<String, Object> conf = getConf();
+            if (ConfigUtils.isLocalMode(conf)) {
+                throw new IllegalArgumentException("Cannot start server in local mode!");
+            }
+            launch();
+            Utils.addShutdownHookWithForceKillIn1Sec(new Runnable(){ 
+                @Override
+                public void run() {
+                    close();
+                }
+            });
+            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
+            StormMetricsRegistry.startMetricsReporters(conf);
+        } catch (Exception e) {
+            LOG.error("Failed to start supervisor\n", e);
+            System.exit(1);
+        }
+    }
+
+    private void registerWorkerNumGauge(String name, final Map<String, Object> conf) {
+        StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
+            @Override
+            public Integer call() throws Exception {
+                Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
+                return pids.size();
+            }
+        });
+    }
+    
+    @Override
+    public void close() {
+        try {
+            LOG.info("Shutting down supervisor {}", getId());
+            this.active = false;
+            heartbeatTimer.close();
+            eventTimer.close();
+            blobUpdateTimer.close();
+            if (eventManager != null) {
+                eventManager.close();
+            }
+            if (readState != null) {
+                readState.close();
+            }
+            getStormClusterState().disconnect();
+        } catch (Exception e) {
+            LOG.error("Error Shutting down", e);
+        }
+    }
+    
+    void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throws InterruptedException, IOException {
+        HashSet<Killable> containers = new HashSet<>();
+        for (String workerId : workerIds) {
+            try {
+                Killable k = launcher.recoverContainer(workerId, localState);
+                if (!k.areAllProcessesDead()) {
+                    k.kill();
+                    containers.add(k);
+                } else {
+                    k.cleanUp();
+                }
+            } catch (Exception e) {
+                LOG.error("Error trying to kill {}", workerId, e);
+            }
+        }
+        int shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1);
+        if (!containers.isEmpty()) {
+            Time.sleepSecs(shutdownSleepSecs);
+        }
+        for (Killable k: containers) {
+            try {
+                k.forceKill();
+                long start = Time.currentTimeMillis();
+                while(!k.areAllProcessesDead()) {
+                    if ((Time.currentTimeMillis() - start) > 10_000) {
+                        throw new RuntimeException("Giving up on killing " + k 
+                                + " after " + (Time.currentTimeMillis() - start) + " ms");
+                    }
+                    Time.sleep(100);
+                    k.forceKill();
+                }
+                k.cleanUp();
+            } catch (Exception e) {
+                LOG.error("Error trying to clean up {}", k, e);
+            }
+        }
+    }
+
+    public void shutdownAllWorkers() {
+        if (readState != null) {
+            readState.shutdownAllWorkers();
+        } else {
+            try {
+                ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());
+                killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
+            } catch (Exception e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean isWaiting() {
+        if (!active) {
+            return true;
+        }
+
+        if (heartbeatTimer.isTimerWaiting() && eventTimer.isTimerWaiting() && eventManager.waiting()) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * supervisor daemon enter entrance
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        @SuppressWarnings("resource")
+        Supervisor instance = new Supervisor(new StandaloneSupervisor());
+        instance.launchDaemon();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
new file mode 100644
index 0000000..19d3b78
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class SupervisorUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class);
+
+    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+    private static SupervisorUtils _instance = INSTANCE;
+    public static void setInstance(SupervisorUtils u) {
+        _instance = u;
+    }
+    public static void resetInstance() {
+        _instance = INSTANCE;
+    }
+
+    static Process processLauncher(Map<String, Object> conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix,
+                                          final ExitCodeCallback exitCodeCallback, File dir) throws IOException {
+        if (StringUtils.isBlank(user)) {
+            throw new IllegalArgumentException("User cannot be blank when calling processLauncher.");
+        }
+        String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+        String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+        String wl;
+        if (StringUtils.isNotBlank(wlinitial)) {
+            wl = wlinitial;
+        } else {
+            wl = stormHome + "/bin/worker-launcher";
+        }
+        List<String> commands = new ArrayList<>();
+        if (commandPrefix != null){
+            commands.addAll(commandPrefix);
+        }
+        commands.add(wl);
+        commands.add(user);
+        commands.addAll(args);
+        LOG.info("Running as user: {} command: {}", user, commands);
+        return SupervisorUtils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir);
+    }
+
+    public static int processLauncherAndWait(Map<String, Object> conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix)
+            throws IOException {
+        int ret = 0;
+        Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null);
+        if (StringUtils.isNotBlank(logPreFix))
+            Utils.readAndLogStream(logPreFix, process.getInputStream());
+        try {
+            process.waitFor();
+        } catch (InterruptedException e) {
+            LOG.info("{} interrupted.", logPreFix);
+        }
+        ret = process.exitValue();
+        return ret;
+    }
+
+    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            String logPrefix = "Storm Code Dir Setup for " + dir;
+            List<String> commands = new ArrayList<>();
+            commands.add("code-dir");
+            commands.add(dir);
+            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+        }
+    }
+
+    public static void setupWorkerArtifactsDir(Map<String, Object> conf, Map<String, Object> stormConf, String dir) throws IOException {
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            String logPrefix = "Worker Artifacts Setup for " + dir;
+            List<String> commands = new ArrayList<>();
+            commands.add("artifacts-dir");
+            commands.add(dir);
+            processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+        }
+    }
+
+    public static void rmrAsUser(Map<String, Object> conf, String id, String path) throws IOException {
+        String user = Utils.getFileOwner(path);
+        String logPreFix = "rmr " + id;
+        List<String> commands = new ArrayList<>();
+        commands.add("rmr");
+        commands.add(path);
+        SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix);
+        if (Utils.checkFileExists(path)) {
+            throw new RuntimeException(path + " was not deleted.");
+        }
+    }
+
+    /**
+     * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then
+     * returns false
+     * 
+     * @param blobInfo
+     * @return
+     */
+    public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
+        return Utils.getBoolean(blobInfo.get("uncompress"), false);
+    }
+
+    /**
+     * Returns a list of LocalResources based on the blobstore-map passed in
+     * 
+     * @param blobstoreMap
+     * @return
+     */
+    public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
+        List<LocalResource> localResourceList = new ArrayList<>();
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) {
+                LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue()));
+                localResourceList.add(localResource);
+            }
+        }
+        return localResourceList;
+    }
+
+    /**
+     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart.
+     * 
+     * @param localizer
+     * @param stormId
+     * @param conf
+     */
+    static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf) throws IOException {
+        Map<String, Object> stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        if (blobstoreMap != null) {
+            localizer.addReferences(localresources, user, topoName);
+        }
+    }
+
+    public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
+        Set<String> stormIds = new HashSet<>();
+        String path = ConfigUtils.supervisorStormDistRoot(conf);
+        Collection<String> rets = Utils.readDirContents(path);
+        for (String ret : rets) {
+            stormIds.add(URLDecoder.decode(ret));
+        }
+        return stormIds;
+    }
+
+    public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
+        String workerRoot = ConfigUtils.workerRoot(conf);
+        return Utils.readDirContents(workerRoot);
+    }
+
+    static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String stormId) throws IOException {
+        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
+        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
+        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
+        if (!Utils.checkFileExists(stormroot))
+            return false;
+        if (!Utils.checkFileExists(stormcodepath))
+            return false;
+        if (!Utils.checkFileExists(stormconfpath))
+            return false;
+        if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath))
+            return true;
+        return false;
+    }
+
+    /**
+     * map from worker id to heartbeat
+     *
+     * @param conf
+     * @return
+     * @throws Exception
+     */
+    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
+        return _instance.readWorkerHeartbeatsImpl(conf);
+    }
+
+    public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception {
+        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
+
+        Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);
+
+        for (String workerId : workerIds) {
+            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
+            // ATTENTION: whb can be null
+            workerHeartbeats.put(workerId, whb);
+        }
+        return workerHeartbeats;
+    }
+
+
+    /**
+     * get worker heartbeat by workerId
+     *
+     * @param conf
+     * @param workerId
+     * @return
+     * @throws IOException
+     */
+    private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) {
+        return _instance.readWorkerHeartbeatImpl(conf, workerId);
+    }
+
+    protected LSWorkerHeartbeat readWorkerHeartbeatImpl(Map<String, Object> conf, String workerId) {
+        try {
+            LocalState localState = ConfigUtils.workerState(conf, workerId);
+            return localState.getWorkerHeartBeat();
+        } catch (Exception e) {
+            LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e);
+            return null;
+        }
+    }
+
+    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
+        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
+    }
+
+    private  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
+        return (now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+    }
+    
+    /**
+     * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
+     * callback.
+     * @param command the command to be executed in the new process
+     * @param environment the environment to be applied to the process. Can be
+     *                    null.
+     * @param logPrefix a prefix for log entries from the output of the process.
+     *                  Can be null.
+     * @param exitCodeCallback code to be called passing the exit code value
+     *                         when the process completes
+     * @param dir the working directory of the new process
+     * @return the new process
+     * @throws IOException
+     * @see java.lang.ProcessBuilder
+     */
+    public static Process launchProcess(List<String> command,
+                                        Map<String,String> environment,
+                                        final String logPrefix,
+                                        final ExitCodeCallback exitCodeCallback,
+                                        File dir)
+            throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(command);
+        Map<String,String> procEnv = builder.environment();
+        if (dir != null) {
+            builder.directory(dir);
+        }
+        builder.redirectErrorStream(true);
+        if (environment != null) {
+            procEnv.putAll(environment);
+        }
+        final Process process = builder.start();
+        if (logPrefix != null || exitCodeCallback != null) {
+            Utils.asyncLoop(new Callable<Object>() {
+                public Object call() {
+                    if (logPrefix != null ) {
+                        Utils.readAndLogStream(logPrefix,
+                                process.getInputStream());
+                    }
+                    if (exitCodeCallback != null) {
+                        try {
+                            process.waitFor();
+                            exitCodeCallback.call(process.exitValue());
+                        } catch (InterruptedException ie) {
+                            LOG.info("{} interrupted", logPrefix);
+                            exitCodeCallback.call(-1);
+                        }
+                    }
+                    return null; // Run only once.
+                }
+            });
+        }
+        return process;
+    }
+    
+    static List<ACL> supervisorZkAcls() {
+        final List<ACL> acls = new ArrayList<>();
+        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
+        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        return acls;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
new file mode 100644
index 0000000..0017092
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import java.util.Map;
+
+import org.apache.storm.command.HealthCheck;
+import org.apache.storm.daemon.supervisor.Supervisor;
+
+public class SupervisorHealthCheck implements Runnable {
+    private final Supervisor supervisor;
+
+    public SupervisorHealthCheck(Supervisor supervisor) {
+        this.supervisor = supervisor;
+    }
+
+    @Override
+    public void run() {
+        Map<String, Object> conf = supervisor.getConf();
+        int healthCode = HealthCheck.healthCheck(conf);
+        if (healthCode != 0) {
+            supervisor.shutdownAllWorkers();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
new file mode 100644
index 0000000..849c584
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+     private final IStormClusterState stormClusterState;
+     private final String supervisorId;
+     private final Map<String, Object> conf;
+     private final Supervisor supervisor;
+
+    public SupervisorHeartbeat(Map<String, Object> conf, Supervisor supervisor) {
+        this.stormClusterState = supervisor.getStormClusterState();
+        this.supervisorId = supervisor.getId();
+        this.supervisor = supervisor;
+        this.conf = conf;
+    }
+
+    private SupervisorInfo buildSupervisorInfo(Map<String, Object> conf, Supervisor supervisor) {
+        SupervisorInfo supervisorInfo = new SupervisorInfo();
+        supervisorInfo.set_time_secs(Time.currentTimeSecs());
+        supervisorInfo.set_hostname(supervisor.getHostName());
+        supervisorInfo.set_assignment_id(supervisor.getAssignmentId());
+
+        List<Long> usedPorts = new ArrayList<>();
+        usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
+        supervisorInfo.set_used_ports(usedPorts);
+        List metaDatas = (List)supervisor.getiSupervisor().getMetadata();
+        List<Long> portList = new ArrayList<>();
+        if (metaDatas != null){
+            for (Object data : metaDatas){
+                Integer port = Utils.getInt(data);
+                if (port != null)
+                    portList.add(port.longValue());
+            }
+        }
+
+        supervisorInfo.set_meta(portList);
+        supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META));
+        supervisorInfo.set_uptime_secs(supervisor.getUpTime().upTime());
+        supervisorInfo.set_version(supervisor.getStormVersion());
+        supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+        return supervisorInfo;
+    }
+
+    private Map<String, Double> mkSupervisorCapacities(Map conf) {
+        Map<String, Double> ret = new HashMap<String, Double>();
+        Double mem = Utils.getDouble(conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB), 4096.0);
+        ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+        Double cpu = Utils.getDouble(conf.get(Config.SUPERVISOR_CPU_CAPACITY), 400.0);
+        ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+        return ret;
+    }
+
+    @Override
+    public void run() {
+        SupervisorInfo supervisorInfo = buildSupervisorInfo(conf, supervisor);
+        stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
new file mode 100644
index 0000000..0b6d996
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
+ * Runnable is intended to be run periodically by a timer, created elsewhere.
+ */
+public class UpdateBlobs implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
+
+    private Supervisor supervisor;
+
+    public UpdateBlobs(Supervisor supervisor) {
+        this.supervisor = supervisor;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map<String, Object> conf = supervisor.getConf();
+            Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
+            AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
+            Set<String> assignedStormIds = new HashSet<>();
+            for (LocalAssignment localAssignment : newAssignment.get().values()) {
+                assignedStormIds.add(localAssignment.get_topology_id());
+            }
+            for (String stormId : downloadedStormIds) {
+                if (assignedStormIds.contains(stormId)) {
+                    String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+                    LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
+                    updateBlobsForTopology(conf, stormId, supervisor.getLocalizer());
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+                LOG.error("Network error while updating blobs, will retry again later", e);
+            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    /**
+     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
+     * 
+     * @param conf
+     * @param stormId
+     * @param localizer
+     * @throws IOException
+     */
+    private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException {
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        try {
+            localizer.updateBlobs(localresources, user);
+        } catch (AuthorizationException authExp) {
+            LOG.error("AuthorizationException error", authExp);
+        } catch (KeyNotFoundException knf) {
+            LOG.error("KeyNotFoundException error", knf);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/event/EventManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/event/EventManager.java b/storm-core/src/jvm/org/apache/storm/event/EventManager.java
new file mode 100644
index 0000000..64536c1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/event/EventManager.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.event;
+
+public interface EventManager extends AutoCloseable {
+    void add(Runnable eventFn);
+
+    boolean waiting();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java b/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java
new file mode 100644
index 0000000..42e6d6b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/event/EventManagerImp.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.event;
+
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EventManagerImp implements EventManager {
+    private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class);
+
+    private AtomicInteger added;
+    private AtomicInteger processed;
+    private AtomicBoolean running;
+    private Thread runner;
+
+    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+    public EventManagerImp(boolean isDaemon) {
+        added = new AtomicInteger();
+        processed = new AtomicInteger();
+        running = new AtomicBoolean(true);
+        runner = new Thread() {
+            @Override
+            public void run() {
+                while (running.get()) {
+                    try {
+                        Runnable r = queue.take();
+                        if (r == null) {
+                            return;
+                        }
+
+                        r.run();
+                        proccessInc();
+                    } catch (Throwable t) {
+                        if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
+                            LOG.info("Event manager interrupted while doing IO");
+                        } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
+                            LOG.info("Event manager interrupted");
+                        } else {
+                            LOG.error("{} Error when processing event", t);
+                            Utils.exitProcess(20, "Error when processing an event");
+                        }
+                    }
+                }
+            }
+        };
+        runner.setDaemon(isDaemon);
+        runner.start();
+    }
+
+    public void proccessInc() {
+        processed.incrementAndGet();
+    }
+
+    @Override
+    public void add(Runnable eventFn) {
+        if (!running.get()) {
+            throw new RuntimeException("Cannot add events to a shutdown event manager");
+        }
+        added.incrementAndGet();
+        queue.add(eventFn);
+    }
+
+    @Override
+    public boolean waiting() {
+        return (Time.isThreadWaiting(runner) || (processed.get() == added.get()));
+    }
+
+    @Override
+    public void close() throws Exception {
+        running.set(false);
+        runner.interrupt();
+        runner.join();
+    }
+}


Mime
View raw message