storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [01/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:35 GMT
Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch cea2464bf -> 5e7476285


http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
new file mode 100644
index 0000000..3f34552
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.junit.Test;
+import org.yaml.snakeyaml.Yaml;
+
+import com.google.common.base.Joiner;
+
+public class ContainerTest {
+    public static class MockContainer extends Container {
+        
+        protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+                LocalAssignment assignment, String workerId,
+                Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException {
+            super(type, conf, supervisorId, port, assignment, workerId, topoConf, ops);
+        }
+
+        public final List<Long> killedPids = new ArrayList<>();
+        public final List<Long> forceKilledPids = new ArrayList<>();
+        public final Set<Long> allPids = new HashSet<>();
+
+        @Override
+        protected void kill(long pid) {
+            killedPids.add(pid);
+        }
+        
+        @Override
+        protected void forceKill(long pid) {
+            forceKilledPids.add(pid);
+        }
+        
+        @Override
+        protected Set<Long> getAllPids() throws IOException {
+            return allPids;
+        }
+        
+        @Override
+        public void launch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+        }
+
+        @Override
+        public void relaunch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+        }
+
+        @Override
+        public boolean didMainProcessExit() {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
+        }
+
+        @Override
+        public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
+        }
+    }
+
+    @Test
+    public void testKill() throws Exception {
+        final String topoId = "test_topology";
+        final Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", 8080, la, "worker", new HashMap<String, Object>(), ops);
+        mc.kill();
+        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
+        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+        mc.forceKill();
+        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
+        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+        
+        long pid = 987654321;
+        mc.allPids.add(pid);
+        
+        mc.kill();
+        assertEquals(mc.allPids, new HashSet<>(mc.killedPids));
+        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+        mc.killedPids.clear();
+        
+        mc.forceKill();
+        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
+        assertEquals(mc.allPids, new HashSet<>(mc.forceKilledPids));
+    }
+    
+    private static final Joiner PATH_JOIN = Joiner.on(File.separator).skipNulls();
+    private static final String DOUBLE_SEP = File.separator + File.separator;    
+    static String asAbsPath(String ... parts) {
+        return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, File.separator);
+    }
+    
+    static File asAbsFile(String ... parts) {
+        return new File(asAbsPath(parts));
+    }
+    
+    static String asPath(String ... parts) {
+        return PATH_JOIN.join(parts);
+    }
+    
+    public static File asFile(String ... parts) {
+        return new File(asPath(parts));
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSetup() throws Exception {
+        final int port = 8080;
+        final String topoId = "test_topology";
+        final String workerId = "worker_id";
+        final String user = "me";
+        final String stormLocal = asAbsPath("tmp", "testing");
+        final File workerArtifacts = asAbsFile(stormLocal, topoId, String.valueOf(port));
+        final File logMetadataFile = new File(workerArtifacts, "worker.yaml");
+        final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId);
+        final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
+        final File distRoot = asAbsFile(stormLocal, "supervisor", "stormdist", topoId);
+        
+        final Map<String, Object> topoConf = new HashMap<>();
+        final List<String> topoUsers = Arrays.asList("t-user-a", "t-user-b");
+        final List<String> logUsers = Arrays.asList("l-user-a", "l-user-b");
+        
+        final List<String> topoGroups = Arrays.asList("t-group-a", "t-group-b");
+        final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b");
+        
+        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        topoConf.put(Config.LOGS_GROUPS, logGroups);
+        topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups);
+        topoConf.put(Config.LOGS_USERS, logUsers);
+        topoConf.put(Config.TOPOLOGY_USERS, topoUsers);
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        
+        final StringWriter yamlDump = new StringWriter();
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.fileExists(workerArtifacts)).thenReturn(true);
+        when(ops.fileExists(workerRoot)).thenReturn(true);
+        when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", 8080, la, workerId, topoConf, ops);
+        
+        mc.setup();
+        
+        //Initial Setup
+        verify(ops).forceMkdir(new File(workerRoot, "pids"));
+        verify(ops).forceMkdir(new File(workerRoot, "tmp"));
+        verify(ops).forceMkdir(new File(workerRoot, "heartbeats"));
+        verify(ops).fileExists(workerArtifacts);
+        
+        //Log file permissions
+        verify(ops).getWriter(logMetadataFile);
+        
+        String yamlResult = yamlDump.toString();
+        Yaml yaml = new Yaml();
+        Map<String, Object> result = (Map<String, Object>) yaml.load(yamlResult);
+        assertEquals(workerId, result.get("worker-id"));
+        assertEquals(user, result.get(Config.TOPOLOGY_SUBMITTER_USER));
+        HashSet<String> allowedUsers = new HashSet<>(topoUsers);
+        allowedUsers.addAll(logUsers);
+        assertEquals(allowedUsers, new HashSet<String>((List<String>)result.get(Config.LOGS_USERS)));
+        
+        HashSet<String> allowedGroups = new HashSet<>(topoGroups);
+        allowedGroups.addAll(logGroups);
+        assertEquals(allowedGroups, new HashSet<String>((List<String>)result.get(Config.LOGS_GROUPS)));
+        
+        //Save the current user to help with recovery
+        verify(ops).dump(workerUserFile, user);
+        
+        //Create links to artifacts dir
+        verify(ops).createSymlink(new File(workerRoot, "artifacts"), workerArtifacts);
+        
+        //Create links to blobs 
+        verify(ops).createSymlink(new File(workerRoot, "resources"), new File(distRoot, "resources"));
+    }
+    
+    @Test
+    public void testCleanup() throws Exception {
+        final int port = 8080;
+        final long pid = 100;
+        final String topoId = "test_topology";
+        final String workerId = "worker_id";
+        final String user = "me";
+        final String stormLocal = asAbsPath("tmp", "testing");
+        final File workerArtifacts = asAbsFile(stormLocal, topoId, String.valueOf(port));
+        final File logMetadataFile = new File(workerArtifacts, "worker.yaml");
+        final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId);
+        final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
+        final File workerPidsRoot = new File(workerRoot, "pids");
+        
+        final Map<String, Object> topoConf = new HashMap<>();
+        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        
+        final StringWriter yamlDump = new StringWriter();
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.fileExists(workerArtifacts)).thenReturn(true);
+        when(ops.fileExists(workerRoot)).thenReturn(true);
+        when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", port, la, workerId, topoConf, ops);
+        mc.allPids.add(pid);
+        
+        mc.cleanUp();
+        verify(ops).deleteIfExists(eq(new File(workerPidsRoot, String.valueOf(pid))), eq(user), any(String.class));
+        
+        verify(ops).deleteIfExists(eq(new File(workerRoot, "pids")), eq(user), any(String.class));
+        verify(ops).deleteIfExists(eq(new File(workerRoot, "tmp")), eq(user), any(String.class));
+        verify(ops).deleteIfExists(eq(new File(workerRoot, "heartbeats")), eq(user), any(String.class));
+        verify(ops).deleteIfExists(eq(workerRoot), eq(user), any(String.class));
+        verify(ops).deleteIfExists(workerUserFile);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
new file mode 100644
index 0000000..24ccda5
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -0,0 +1,515 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.daemon.supervisor.Slot.StaticState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.daemon.supervisor.Slot.DynamicState;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+
+public class SlotTest {
+    static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, Double mem_off_heap) {
+        WorkerResources resources = new WorkerResources();
+        if (cpu != null) {
+            resources.set_cpu(cpu);
+        }
+        
+        if (mem_on_heap != null) {
+            resources.set_mem_on_heap(mem_on_heap);
+        }
+        
+        if (mem_off_heap != null) {
+            resources.set_mem_off_heap(mem_off_heap);
+        }
+        return resources;
+    }
+    
+    static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) {
+        LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
+        ret.set_topology_id(id);
+        ret.set_port(port);
+        ret.set_executors(exec);
+        ret.set_time_secs(timeSecs);
+        return ret;
+    }
+    
+    static List<ExecutorInfo> mkExecutorInfoList(int ... executors) {
+        ArrayList<ExecutorInfo> ret = new ArrayList<>(executors.length);
+        for (int exec : executors) {
+            ExecutorInfo execInfo = new ExecutorInfo();
+            execInfo.set_task_start(exec);
+            execInfo.set_task_end(exec);
+            ret.add(execInfo);
+        }
+        return ret;
+    }
+    
+    static LocalAssignment mkLocalAssignment(String id, List<ExecutorInfo> exec, WorkerResources resources) {
+        LocalAssignment ret = new LocalAssignment();
+        ret.set_topology_id(id);
+        ret.set_executors(exec);
+        if (resources != null) {
+            ret.set_resources(resources);
+        }
+        return ret;
+    }
+    
+    @Test
+    public void testEquivilant() {
+        LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0));
+        LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0));
+
+        assertTrue(Slot.equivalent(null, null));
+        assertTrue(Slot.equivalent(a, a));
+        assertTrue(Slot.equivalent(b, bReordered));
+        assertTrue(Slot.equivalent(bReordered, b));
+        
+        assertFalse(Slot.equivalent(a, aResized));
+        assertFalse(Slot.equivalent(aResized, a));
+        assertFalse(Slot.equivalent(a, null));
+        assertFalse(Slot.equivalent(null, b));
+        assertFalse(Slot.equivalent(a, b));
+    }
+    
+    @Test
+    public void testEmptyToEmpty() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            ILocalizer localizer = mock(ILocalizer.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            ISupervisor iSuper = mock(ISupervisor.class);
+            StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000,
+                    containerLauncher, "localhost", 8080, iSuper, state);
+            DynamicState dynamicState = new DynamicState(null, null, null);
+            DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            assertTrue(Time.currentTimeMillis() > 1000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+    
+    @Test
+    public void testLaunchContainerFromEmpty() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String topoId = "NEW";
+            List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment newAssignment = 
+                    mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            Container container = mock(Container.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            when(containerLauncher.launchContainer(port, newAssignment, state)).thenReturn(container);
+            LSWorkerHeartbeat hb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs());
+            when(container.readHeartbeat()).thenReturn(hb, hb);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> baseFuture = mock(Future.class);
+            when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> blobFuture = mock(Future.class);
+            when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(null, null, null)
+                    .withNewAssignment(newAssignment);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+            verify(localizer).requestDownloadBaseTopologyBlobs(newAssignment, port);
+            assertEquals(MachineState.WAITING_FOR_BASIC_LOCALIZATION, nextState.state);
+            assertSame("pendingDownload not set properly", baseFuture, nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(baseFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(localizer).requestDownloadTopologyBlobs(newAssignment, port);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+            assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(blobFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(containerLauncher).launchContainer(port, newAssignment, state);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 2000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+
+
+    @Test
+    public void testRelaunch() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String topoId = "CURRENT";
+            List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment assignment = 
+                    mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            Container container = mock(Container.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10);
+            LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs());
+            when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, goodhb);
+            when(container.areAllProcessesDead()).thenReturn(false, true);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            LocalState state = mock(LocalState.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(assignment, container, assignment);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+            assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
+            verify(container).kill();
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
+            verify(container).forceKill();
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state);
+            verify(container).relaunch();
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+    
+    @Test
+    public void testReschedule() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String cTopoId = "CURRENT";
+            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment cAssignment = 
+                    mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0));
+            
+            Container cContainer = mock(Container.class);
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
+            when(cContainer.readHeartbeat()).thenReturn(chb);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            
+            String nTopoId = "NEW";
+            List<ExecutorInfo> nExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment nAssignment = 
+                    mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0));
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            Container nContainer = mock(Container.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            when(containerLauncher.launchContainer(port, nAssignment, state)).thenReturn(nContainer);
+            LSWorkerHeartbeat nhb = mkWorkerHB(nTopoId, 100, nExecList, Time.currentTimeSecs());
+            when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> baseFuture = mock(Future.class);
+            when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> blobFuture = mock(Future.class);
+            when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).kill();
+            verify(localizer).requestDownloadBaseTopologyBlobs(nAssignment, port);
+            assertSame("pendingDownload not set properly", baseFuture, nextState.pendingDownload);
+            assertEquals(nAssignment, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).forceKill();
+            assertSame("pendingDownload not set properly", baseFuture, nextState.pendingDownload);
+            assertEquals(nAssignment, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_BASIC_LOCALIZATION, nextState.state);
+            verify(cContainer).cleanUp();
+            verify(localizer).releaseSlotFor(cAssignment, port);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+            verify(baseFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(localizer).requestDownloadTopologyBlobs(nAssignment, port);
+            assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload);
+            assertEquals(nAssignment, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(blobFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(containerLauncher).launchContainer(port, nAssignment, state);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 4000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+
+    
+    @Test
+    public void testRunningToEmpty() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String cTopoId = "CURRENT";
+            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment cAssignment = 
+                    mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0));
+            
+            Container cContainer = mock(Container.class);
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
+            when(cContainer.readHeartbeat()).thenReturn(chb);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            LocalState state = mock(LocalState.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).kill();
+            verify(localizer, never()).requestDownloadBaseTopologyBlobs(null, port);
+            assertSame("pendingDownload not set properly", null, nextState.pendingDownload);
+            assertEquals(null, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).forceKill();
+            assertSame("pendingDownload not set properly", null, nextState.pendingDownload);
+            assertEquals(null, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            verify(cContainer).cleanUp();
+            verify(localizer).releaseSlotFor(cAssignment, port);
+            assertEquals(null, nextState.container);
+            assertEquals(null, nextState.currentAssignment);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            assertEquals(null, nextState.container);
+            assertEquals(null, nextState.currentAssignment);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            assertEquals(null, nextState.container);
+            assertEquals(null, nextState.currentAssignment);
+            assertTrue(Time.currentTimeMillis() > 3000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+    
+    @Test
+    public void testRunWithProfileActions() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String cTopoId = "CURRENT";
+            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment cAssignment = 
+                    mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0));
+            
+            Container cContainer = mock(Container.class);
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()+100); //NOT going to timeout for a while
+            when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb);
+            when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true);
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            LocalState state = mock(LocalState.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            Set<TopoProfileAction> profileActions = new HashSet<>();
+            ProfileRequest request = new ProfileRequest();
+            request.set_action(ProfileAction.JPROFILE_STOP);
+            NodeInfo info = new NodeInfo();
+            info.set_node("localhost");
+            info.add_to_port(port);
+            request.set_nodeInfo(info);
+            request.set_time_stamp(Time.currentTimeMillis() + 3000);//3 seconds from now
+            
+            TopoProfileAction profile = new TopoProfileAction(cTopoId, request);
+            profileActions.add(profile);
+            Set<TopoProfileAction> expectedPending = new HashSet<>();
+            expectedPending.add(profile);
+            
+            
+            DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withProfileActions(profileActions, Collections.<TopoProfileAction> emptySet());
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            verify(cContainer).runProfiling(request, false);
+            assertEquals(expectedPending, nextState.pendingStopProfileActions);
+            assertEquals(expectedPending, nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertEquals(expectedPending, nextState.pendingStopProfileActions);
+            assertEquals(expectedPending, nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertEquals(expectedPending, nextState.pendingStopProfileActions);
+            assertEquals(expectedPending, nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            verify(cContainer).runProfiling(request, true);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 4000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 5000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
new file mode 100644
index 0000000..ed2b4ff
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.utils.ConfigUtils;
+import org.junit.Test;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.apache.storm.utils.Utils;
+
+public class AsyncLocalizerTest {
+
+    @Test
+    public void testRequestDownloadBaseTopologyBlobs() throws Exception {
+        final String topoId = "TOPO";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        ExecutorInfo ei = new ExecutorInfo();
+        ei.set_task_start(1);
+        ei.set_task_end(1);
+        la.add_to_executors(ei);
+        final int port = 8080;
+        final String jarKey = topoId + "-stormjar.jar";
+        final String codeKey = topoId + "-stormcode.ser";
+        final String confKey = topoId + "-stormconf.ser";
+        final String stormLocal = "/tmp/storm-local/";
+        final String stormRoot = stormLocal+topoId+"/";
+        final File fStormRoot = new File(stormRoot);
+        ClientBlobStore blobStore = mock(ClientBlobStore.class);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.SUPERVISOR_BLOBSTORE, ClientBlobStore.class.getName());
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
+        conf.put(Config.STORM_CLUSTER_MODE, "distributed");
+        conf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        Localizer localizer = mock(Localizer.class);
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        ConfigUtils mockedCU = mock(ConfigUtils.class);
+        Utils mockedU = mock(Utils.class);
+        
+        Map<String, Object> topoConf = new HashMap<>(conf);
+        
+        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+        ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
+        Utils origUtils = Utils.setInstance(mockedU);
+        try {
+            when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
+            when(mockedCU.supervisorLocalDirImpl(conf)).thenReturn(stormLocal);
+            when(mockedU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
+            when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
+
+            Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port);
+            f.get(20, TimeUnit.SECONDS);
+            // We should be done now...
+            
+            verify(blobStore).prepare(conf);
+            verify(mockedU).downloadResourcesAsSupervisorImpl(eq(jarKey), startsWith(stormLocal), eq(blobStore));
+            verify(mockedU).downloadResourcesAsSupervisorImpl(eq(codeKey), startsWith(stormLocal), eq(blobStore));
+            verify(mockedU).downloadResourcesAsSupervisorImpl(eq(confKey), startsWith(stormLocal), eq(blobStore));
+            verify(blobStore).shutdown();
+            //Extracting the dir from the jar
+            verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), eq("resources"), any(File.class));
+            verify(ops).moveDirectoryPreferAtomic(any(File.class), eq(fStormRoot));
+            verify(ops).setupStormCodeDir(topoConf, fStormRoot);
+            
+            verify(ops, never()).deleteIfExists(any(File.class));
+        } finally {
+            al.shutdown();
+            ConfigUtils.setInstance(orig);
+            Utils.setInstance(origUtils);
+        }
+    }
+
+    @Test
+    public void testRequestDownloadTopologyBlobs() throws Exception {
+        final String topoId = "TOPO-12345";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        ExecutorInfo ei = new ExecutorInfo();
+        ei.set_task_start(1);
+        ei.set_task_end(1);
+        la.add_to_executors(ei);
+        final String topoName = "TOPO";
+        final int port = 8080;
+        final String user = "user";
+        final String simpleLocalName = "simple.txt";
+        final String simpleKey = "simple";
+        
+        final String stormLocal = "/tmp/storm-local/";
+        final File userDir = new File(stormLocal, user);
+        final String stormRoot = stormLocal+topoId+"/";
+        
+        final String localizerRoot = "/tmp/storm-localizer/";
+        final String simpleLocalFile = localizerRoot + user + "/simple";
+        final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current";
+       
+        final StormTopology st = new StormTopology();
+        st.set_spouts(new HashMap<String, SpoutSpec>());
+        st.set_bolts(new HashMap<String, Bolt>());
+        st.set_state_spouts(new HashMap<String, StateSpoutSpec>());
+ 
+        Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
+        Map<String, Object> simple = new HashMap<>();
+        simple.put("localname", simpleLocalName);
+        simple.put("uncompress", false);
+        topoBlobMap.put(simpleKey, simple);
+        
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        Localizer localizer = mock(Localizer.class);
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        ConfigUtils mockedCU = mock(ConfigUtils.class);
+        Utils mockedU = mock(Utils.class);
+        
+        Map<String, Object> topoConf = new HashMap<>(conf);
+        topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
+        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        topoConf.put(Config.TOPOLOGY_NAME, topoName);
+        
+        List<LocalizedResource> localizedList = new ArrayList<>();
+        LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false);
+        localizedList.add(simpleLocal);
+        
+        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+        ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
+        Utils origUtils = Utils.setInstance(mockedU);
+        try {
+            when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
+            when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
+            when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
+            
+            when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir);
+            
+            when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList);
+            
+            Future<Void> f = al.requestDownloadTopologyBlobs(la, port);
+            f.get(20, TimeUnit.SECONDS);
+            // We should be done now...
+            
+            verify(localizer).getLocalUserFileCacheDir(user);
+            verify(ops).fileExists(userDir);
+            verify(ops).forceMkdir(userDir);
+            
+            verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir));
+
+            verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile));
+        } finally {
+            al.shutdown();
+            ConfigUtils.setInstance(orig);
+            Utils.setInstance(origUtils);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
index ab2c9af..04b5ab2 100644
--- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
+++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -34,8 +34,8 @@ public class LocalizedResourceRetentionSetTest {
     // check adding reference to local resource with topology of same name
     localresource2.addReference(("topo2"));
 
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, false);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, false);
     lrretset.addResources(lrset);
     assertEquals("number to clean is not 0", 0, lrretset.getSizeWithNoReferences());
     localresource1.removeReference(("topo1"));
@@ -64,9 +64,9 @@ public class LocalizedResourceRetentionSetTest {
     // check adding reference to local resource with topology of same name
     localresource2.addReference(("topo1"));
     localresource2.setSize(10);
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, false);
-    lrset.addResource("archive1", archiveresource1, true);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, false);
+    lrset.add("archive1", archiveresource1, true);
 
     lrretset.addResources(lrset);
     assertEquals("number to clean is not 2", 2, lrretset.getSizeWithNoReferences());

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
index 7f20d19..550d695 100644
--- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
+++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
@@ -37,9 +37,9 @@ public class LocalizedResourceSetTest {
     LocalizedResource localresource1 = new LocalizedResource("key1", "testfile1", false, "topo1");
     LocalizedResource localresource2 = new LocalizedResource("key2", "testfile2", true, "topo1");
     assertEquals("size is wrong", 0, lrset.getSize());
-    lrset.addResource("key1", localresource1, false);
+    lrset.add("key1", localresource1, false);
     assertEquals("size is wrong", 1, lrset.getSize());
-    lrset.addResource("key2", localresource2, true);
+    lrset.add("key2", localresource2, true);
     assertEquals("size is wrong", 2, lrset.getSize());
   }
 
@@ -48,8 +48,8 @@ public class LocalizedResourceSetTest {
     LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
     LocalizedResource localresource1 = new LocalizedResource("key1", "testfile1", false, "topo1");
     LocalizedResource localresource2 = new LocalizedResource("key2", "testfile2", true, "topo1");
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, true);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, true);
     assertEquals("get doesn't return same object", localresource1, lrset.get("key1", false));
     assertEquals("get doesn't return same object", localresource2, lrset.get("key2", true));
 
@@ -60,8 +60,8 @@ public class LocalizedResourceSetTest {
     LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
     LocalizedResource localresource1 = new LocalizedResource("key1", "testfile1", false, "topo1");
     LocalizedResource localresource2 = new LocalizedResource("key2", "testfile2", true, "topo1");
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, true);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, true);
     assertEquals("doesn't exist", true, lrset.exists("key1", false));
     assertEquals("doesn't exist", true, lrset.exists("key2", true));
     boolean val = lrset.remove(localresource1);

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java
index 45ba108..613e165 100644
--- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizerTest.java
@@ -259,7 +259,7 @@ public class LocalizerTest {
 
   // archive passed in must contain symlink named tmptestsymlink if not a zip file
   public void testArchives(String archivePath, boolean supportSymlinks, int size) throws Exception {
-    if (isOnWindows()) {
+    if (Utils.isOnWindows()) {
       // Windows should set this to false cause symlink in compressed file doesn't work properly.
       supportSymlinks = false;
     }
@@ -669,11 +669,4 @@ public class LocalizerTest {
     assertEquals("blob version not correct", 3, Utils.localVersionOfBlob(keyFile.toString()));
     assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists());
   }
-
-  private boolean isOnWindows() {
-    if (System.getenv("OS") != null) {
-      return System.getenv("OS").equals("Windows_NT");
-    }
-    return false;
-  }
 }


Mime
View raw message