storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [07/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:41 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
new file mode 100644
index 0000000..e337b1f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ZKStateStorage implements IStateStorage {
+
+    private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
+
+    private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>();
+    private CuratorFramework zkWriter;
+    private CuratorFramework zkReader;
+    private AtomicBoolean active;
+
+    private boolean isNimbus;
+    private Map authConf;
+    private Map<Object, Object> conf;
+
+    private class ZkWatcherCallBack implements WatcherCallBack{
+        @Override
+        public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+            if (active.get()) {
+                if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                    LOG.debug("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+                } else {
+                    LOG.debug("Received event {} : {} : {}", state, type, path);
+                }
+
+                if (!type.equals(Watcher.Event.EventType.None)) {
+                    for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+                        ZKStateChangedCallback fn = e.getValue();
+                        fn.changed(type, path);
+                    }
+                }
+            }
+        }
+    }
+
+    public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        this.conf = conf;
+        this.authConf = authConf;
+        if (context.getDaemonType().equals(DaemonType.NIMBUS))
+            this.isNimbus = true;
+
+        // just mkdir STORM_ZOOKEEPER_ROOT dir
+        CuratorFramework zkTemp = mkZk();
+        String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+        Zookeeper.mkdirs(zkTemp, rootPath, acls);
+        zkTemp.close();
+
+        active = new AtomicBoolean(true);
+        zkWriter = mkZk(new ZkWatcherCallBack());
+        if (isNimbus) {
+            zkReader = mkZk(new ZkWatcherCallBack());
+        } else {
+            zkReader = zkWriter;
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk() throws IOException {
+        return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
+                new DefaultWatcherCallBack(), authConf);
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+        return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
+                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+    }
+
+    @Override
+    public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+        Zookeeper.deleteNodeBlobstore(zkWriter, path, nimbusHostPortInfo);
+    }
+
+    @Override
+    public String register(ZKStateChangedCallback callback) {
+        String id = UUID.randomUUID().toString();
+        this.callbacks.put(id, callback);
+        return id;
+    }
+
+    @Override
+    public void unregister(String id) {
+        this.callbacks.remove(id);
+    }
+
+    @Override
+    public String create_sequential(String path, byte[] data, List<ACL> acls) {
+        return Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
+    }
+
+    @Override
+    public void mkdirs(String path, List<ACL> acls) {
+        Zookeeper.mkdirs(zkWriter, path, acls);
+    }
+
+    @Override
+    public void delete_node(String path) {
+        Zookeeper.deleteNode(zkWriter, path);
+    }
+
+    @Override
+    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+        Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls);
+        if (Zookeeper.exists(zkWriter, path, false)) {
+            try {
+                Zookeeper.setData(zkWriter, path, data);
+            } catch (RuntimeException e) {
+                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+                    Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+                } else {
+                    throw e;
+                }
+            }
+
+        } else {
+            Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+        }
+    }
+
+    @Override
+    public Integer get_version(String path, boolean watch) throws Exception {
+        Integer ret = Zookeeper.getVersion(zkReader, path, watch);
+        return ret;
+    }
+
+    @Override
+    public boolean node_exists(String path, boolean watch) {
+        return Zookeeper.existsNode(zkReader, path, watch);
+    }
+
+    @Override
+    public List<String> get_children(String path, boolean watch) {
+        return Zookeeper.getChildren(zkReader, path, watch);
+    }
+
+    @Override
+    public void close() {
+        this.active.set(false);
+        zkWriter.close();
+        if (isNimbus) {
+            zkReader.close();
+        }
+    }
+
+    @Override
+    public void set_data(String path, byte[] data, List<ACL> acls) {
+        if (Zookeeper.exists(zkWriter, path, false)) {
+            Zookeeper.setData(zkWriter, path, data);
+        } else {
+            Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls);
+            try {
+                Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
+            } catch (RuntimeException e) {
+                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
+                    Zookeeper.setData(zkWriter, path, data);
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Override
+    public byte[] get_data(String path, boolean watch) {
+        byte[] ret = null;
+
+        ret = Zookeeper.getData(zkReader, path, watch);
+
+        return ret;
+    }
+
+    @Override
+    public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
+        return Zookeeper.getDataWithVersion(zkReader, path, watch);
+    }
+
+    @Override
+    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+        set_data(path, data, acls);
+    }
+
+    @Override
+    public byte[] get_worker_hb(String path, boolean watch) {
+        return Zookeeper.getData(zkReader, path, watch);
+    }
+
+    @Override
+    public List<String> get_worker_hb_children(String path, boolean watch) {
+        return get_children(path, watch);
+    }
+
+    @Override
+    public void delete_worker_hb(String path) {
+        delete_node(path);
+    }
+
+    @Override
+    public void add_listener(final ConnectionStateListener listener) {
+        Zookeeper.addListener(zkReader, new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                listener.stateChanged(curatorFramework, connectionState);
+            }
+        });
+    }
+
+    @Override
+    public void sync_path(String path) {
+        Zookeeper.syncPath(zkWriter, path);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
new file mode 100644
index 0000000..73bcea0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+
+import org.apache.storm.utils.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class HealthCheck {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HealthCheck.class);
+    private static final String FAILED = "failed";
+    private static final String SUCCESS = "success";
+    private static final String TIMEOUT = "timeout";
+    private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
+
+    public static int healthCheck(Map conf) {
+        String healthDir = ConfigUtils.absoluteHealthCheckDir(conf);
+        List<String> results = new ArrayList<>();
+        if (healthDir != null) {
+            File parentFile = new File(healthDir);
+            List<String> healthScripts = new ArrayList<String>();
+            if (parentFile.exists()) {
+                File[] list = parentFile.listFiles();
+                for (File f : list) {
+                    if (!f.isDirectory() && f.canExecute())
+                        healthScripts.add(f.getAbsolutePath());
+                }
+            }
+            for (String script : healthScripts) {
+                String result = processScript(conf, script);
+                results.add(result);
+            }
+        }
+
+        // failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
+        // We treat non-zero exit codes as indicators that the scripts failed
+        // to execute properly, not that the system is unhealthy, in which case
+        // we don't want to start killing things.
+
+        if (results.contains(FAILED) || results.contains(TIMEOUT)) {
+            return 1;
+        } else {
+            return 0;
+        }
+
+    }
+
+    public static String processScript(Map conf, String script) {
+        Thread interruptThread = null;
+        try {
+            Process process = Runtime.getRuntime().exec(script);
+            final long timeout = (long) (conf.get(Config.STORM_HEALTH_CHECK_TIMEOUT_MS));
+            final Thread curThread = Thread.currentThread();
+            // kill process when timeout
+            interruptThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(timeout);
+                        curThread.interrupt();
+                    } catch (InterruptedException e) {
+                        // Ignored
+                    }
+                }
+            });
+            interruptThread.start();
+            process.waitFor();
+            interruptThread.interrupt();
+            curThread.interrupted();
+
+            if (process.exitValue() != 0) {
+                String str;
+                InputStream stdin = process.getInputStream();
+                BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
+                while ((str = reader.readLine()) != null) {
+                    if (str.startsWith("ERROR")) {
+                        return FAILED;
+                    }
+                }
+                return SUCCESS;
+            }
+            return FAILED_WITH_EXIT_CODE;
+        } catch (InterruptedException e) {
+            LOG.warn("Script:  {} timed out.", script);
+            return TIMEOUT;
+        } catch (Exception e) {
+            LOG.warn("Script failed with exception: ", e);
+            return FAILED_WITH_EXIT_CODE;
+        } finally {
+            if (interruptThread != null)
+                interruptThread.interrupt();
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        System.exit(healthCheck(conf));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
new file mode 100644
index 0000000..1fcd81b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+
+import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.utils.ConfigUtils;
+
+public class KillWorkers {
+    public static void main(String [] args) throws Exception {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
+        try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor())) {
+            supervisor.shutdownAllWorkers();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
new file mode 100644
index 0000000..d1b71a7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface DaemonCommon {
+    public boolean isWaiting();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
index 56b920b..eb72939 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/MetricsUtils.java
@@ -20,6 +20,7 @@ package org.apache.storm.daemon.metrics;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,8 +86,8 @@ public class MetricsUtils {
     public static File getCsvLogDir(Map stormConf) {
         String csvMetricsLogDirectory = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR), null);
         if (csvMetricsLogDirectory == null) {
-            csvMetricsLogDirectory = absoluteStormLocalDir(stormConf);
-            csvMetricsLogDirectory = csvMetricsLogDirectory + File.separator + "csvmetrics";
+            csvMetricsLogDirectory = ConfigUtils.absoluteStormLocalDir(stormConf);
+            csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics";
         }
         File csvMetricsDir = new File(csvMetricsLogDirectory);
         validateCreateOutputDir(csvMetricsDir);
@@ -104,18 +105,4 @@ public class MetricsUtils {
             throw new IllegalStateException(dir.getName() + " is not a directory.");
         }
     }
-
-    public static String absoluteStormLocalDir(Map conf) {
-        String stormHome = System.getProperty("storm.home");
-        String localDir = (String) conf.get(Config.STORM_LOCAL_DIR);
-        if (localDir == null) {
-            return (stormHome + File.separator + "storm-local");
-        } else {
-            if (new File(localDir).isAbsolute()) {
-                return localDir;
-            } else {
-                return (stormHome + File.separator + localDir);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
new file mode 100644
index 0000000..e531b84
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AdvancedFSOps {
+    private static final Logger LOG = LoggerFactory.getLogger(AdvancedFSOps.class);
+    
+    /**
+     * Factory to create a new AdvancedFSOps
+     * @param conf the configuration of the process
+     * @return the appropriate instance of the class for this config and environment.
+     */
+    public static AdvancedFSOps make(Map<String, Object> conf) {
+        if (Utils.isOnWindows()) {
+            return new AdvancedWindowsFSOps(conf);
+        }
+        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+            return new AdvancedRunAsUserFSOps(conf);
+        }
+        return new AdvancedFSOps();
+    }
+    
+    private static class AdvancedRunAsUserFSOps extends AdvancedFSOps {
+        private final Map<String, Object> _conf;
+        
+        public AdvancedRunAsUserFSOps(Map<String, Object> conf) {
+            if (Utils.isOnWindows()) {
+                throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet");
+            }
+            _conf = conf;
+        }
+        
+        @Override
+        public void setupBlobPermissions(File path, String user) throws IOException {
+            String logPrefix = "setup blob permissions for " + path;
+            SupervisorUtils.processLauncherAndWait(_conf, user, Arrays.asList("blob", path.toString()), null, logPrefix);
+        }
+        
+        @Override
+        public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+            String absolutePath = path.getAbsolutePath();
+            LOG.info("Deleting path {}", absolutePath);
+            if (user == null) {
+                user = Files.getOwner(path.toPath()).getName();
+            }
+            List<String> commands = new ArrayList<>();
+            commands.add("rmr");
+            commands.add(absolutePath);
+            SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+            if (Utils.checkFileExists(absolutePath)) {
+                throw new RuntimeException(path + " was not deleted.");
+            }
+        }
+        
+        @Override
+        public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+            SupervisorUtils.setupStormCodeDir(_conf, topologyConf, path.getCanonicalPath());
+        }
+
+        @Override
+        public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+            SupervisorUtils.setupWorkerArtifactsDir(_conf, topologyConf, path.getCanonicalPath());
+        }
+    }
+    
+    /**
+     * Operations that need to override the default ones when running on Windows
+     *
+     */
+    private static class AdvancedWindowsFSOps extends AdvancedFSOps {
+
+        public AdvancedWindowsFSOps(Map<String, Object> conf) {
+            if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                throw new RuntimeException("ERROR: Windows doesn't support running workers as different users yet");
+            }
+        }
+        
+        @Override
+        public void restrictDirectoryPermissions(File dir) throws IOException {
+            //NOOP, if windows gets support for run as user we will need to find a way to support this
+        }
+        
+        @Override
+        public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+            // Files/move with non-empty directory doesn't work well on Windows
+            // This is not atomic but it does work
+            FileUtils.moveDirectory(fromDir, toDir);
+        }
+        
+        @Override
+        public boolean supportsAtomicDirectoryMove() {
+            // Files/move with non-empty directory doesn't work well on Windows
+            // FileUtils.moveDirectory is not atomic
+            return false;
+        }
+    }
+    
+    
+    protected AdvancedFSOps() {
+        //NOOP, but restricted permissions
+    }
+
+    /**
+     * Set directory permissions to (OWNER)RWX (GROUP)R-X (OTHER)---
+     * On some systems that do not support this, it may become a noop
+     * @param dir the directory to change permissions on
+     * @throws IOException on any error
+     */
+    public void restrictDirectoryPermissions(File dir) throws IOException {
+        Set<PosixFilePermission> perms = new HashSet<>(
+                Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+                        PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
+                        PosixFilePermission.GROUP_EXECUTE));
+        Files.setPosixFilePermissions(dir.toPath(), perms);
+    }
+
+    /**
+     * Move fromDir to toDir, and try to make it an atomic move if possible
+     * @param fromDir what to move
+     * @param toDir where to move it from
+     * @throws IOException on any error
+     */
+    public void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException {
+        FileUtils.forceMkdir(toDir);
+        Files.move(fromDir.toPath(), toDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
+    }
+    
+    /**
+     * @return true if an atomic directory move works, else false.
+     */
+    public boolean supportsAtomicDirectoryMove() {
+        return true;
+    }
+    
+    /**
+     * Copy a directory
+     * @param fromDir from where
+     * @param toDir to where
+     * @throws IOException on any error
+     */
+    public void copyDirectory(File fromDir, File toDir) throws IOException {
+        FileUtils.copyDirectory(fromDir, toDir);
+    }
+    
+    /**
+     * Setup permissions properly for an internal blob store path
+     * @param path the path to set the permissions on
+     * @param user the user to change the permissions for
+     * @throws IOException on any error
+     */
+    public void setupBlobPermissions(File path, String user) throws IOException {
+        //Normally this is a NOOP
+    }
+
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @param user who to delete it as if doing it as someone else is supported
+     * @param logPrefix if an external process needs to be launched to delete 
+     * the object what prefix to include in the logs
+     * @throws IOException on any error.
+     */
+    public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
+        //by default no need to do this as a different user
+        deleteIfExists(path);
+    }
+    
+    /**
+     * Delete a file or a directory and all of the children. If it exists.
+     * @param path what to delete
+     * @throws IOException on any error.
+     */
+    public void deleteIfExists(File path) throws IOException {
+        LOG.info("Deleting path {}", path);
+        Path p = path.toPath();
+        if (Files.exists(p)) {
+            try {
+                FileUtils.forceDelete(path);
+            } catch (FileNotFoundException ignored) {}
+        }
+    }
+
+    /**
+     * Setup the permissions for the storm code dir
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    public void setupStormCodeDir(Map<String, Object> topologyConf, File path) throws IOException {
+        //By default this is a NOOP
+    }
+
+    /**
+     * Setup the permissions for the worker artifacts dirs
+     * @param topologyConf the config of the Topology
+     * @param path the directory to set the permissions on
+     * @throws IOException on any error
+     */
+    public void setupWorkerArtifactsDir(Map<String, Object> topologyConf, File path) throws IOException {
+        //By default this is a NOOP
+    }
+
+    /**
+     * Sanity check if everything the topology needs is there for it to run.
+     * @param conf the config of the supervisor
+     * @param topologyId the ID of the topology
+     * @return true if everything is there, else false
+     * @throws IOException on any error
+     */
+    public boolean doRequiredTopoFilesExist(Map<String, Object> conf, String topologyId) throws IOException {
+        return SupervisorUtils.doRequiredTopoFilesExist(conf, topologyId);
+    }
+    
+    /**
+     * Makes a directory, including any necessary but nonexistent parent
+     * directories. 
+     *
+     * @param path the directory to create
+     * @throws IOException on any error
+     */
+    public void forceMkdir(File path) throws IOException {
+        FileUtils.forceMkdir(path);
+    }
+    
+    /**
+     * Check if a file exists or not
+     * @param path the path to check
+     * @return true if it exists else false
+     * @throws IOException on any error.
+     */
+    public boolean fileExists(File path) throws IOException {
+        return path.exists();
+    }
+
+    /**
+     * Get a writer for the given location 
+     * @param file the file to write to
+     * @return the Writer to use.
+     * @throws IOException on any error
+     */
+    public Writer getWriter(File file) throws IOException {
+        return new FileWriter(file);
+    }
+    
+    /**
+     * Get an output stream to write to a given file
+     * @param file the file to write to
+     * @return an OutputStream for that file
+     * @throws IOException on any error
+     */
+    public OutputStream getOutputStream(File file) throws IOException {
+        return new FileOutputStream(file);
+    }
+    
+    /**
+     * Dump a string to a file
+     * @param location where to write to
+     * @param data the data to write
+     * @throws IOException on any error
+     */
+    public void dump(File location, String data) throws IOException {
+        File parent = location.getParentFile();
+        if (!parent.exists()) {
+            forceMkdir(parent);
+        }
+        try (Writer w = getWriter(location)) {
+            w.write(data);
+        }
+    }
+    
+    /**
+     * Read the contents of a file into a String
+     * @param location the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */
+    public String slurpString(File location) throws IOException {
+        return FileUtils.readFileToString(location, "UTF-8");
+    }
+
+    /**
+     * Read the contents of a file into a byte array.
+     * @param localtion the file to read
+     * @return the contents of the file
+     * @throws IOException on any error
+     */ 
+    public byte[] slurp(File location) throws IOException {
+        return FileUtils.readFileToByteArray(location);
+    }
+
+    /**
+     * Create a symbolic link pointing at target
+     * @param link the link to create
+     * @param target where it should point to
+     * @throws IOException on any error.
+     */
+    public void createSymlink(File link, File target) throws IOException {
+        Path plink = link.toPath().toAbsolutePath();
+        Path ptarget = target.toPath().toAbsolutePath();
+        LOG.debug("Creating symlink [{}] to [{}]", plink, ptarget);
+        if (Files.exists(plink)) {
+            if (Files.isSameFile(plink, ptarget)) {
+                //It already points where we want it to
+                return;
+            }
+            FileUtils.forceDelete(link);
+        }
+        Files.createSymbolicLink(plink, ptarget);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
new file mode 100644
index 0000000..daa1d00
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -0,0 +1,713 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * A container that runs processes on the local box.
+ */
+public class BasicContainer extends Container {
+    private static final Logger LOG = LoggerFactory.getLogger(BasicContainer.class);
+    private static final FilenameFilter jarFilter = new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+            return name.endsWith(".jar");
+        }
+    };
+    private static final Joiner CPJ = 
+            Joiner.on(Utils.CLASS_PATH_SEPARATOR).skipNulls();
+    
+    protected final LocalState _localState;
+    protected final String _profileCmd;
+    protected final String _stormHome = System.getProperty("storm.home");
+    protected volatile boolean _exitedEarly = false;
+
+    private class ProcessExitCallback implements ExitCodeCallback {
+        private final String _logPrefix;
+
+        public ProcessExitCallback(String logPrefix) {
+            _logPrefix = logPrefix;
+        }
+
+        @Override
+        public void call(int exitCode) {
+            LOG.info("{} exited with code: {}", _logPrefix, exitCode);
+            _exitedEarly = true;
+        }
+    }
+    
+    /**
+     * Create a new BasicContainer
+     * @param type the type of container being made.
+     * @param conf the supervisor config
+     * @param supervisorId the ID of the supervisor this is a part of.
+     * @param port the port the container is on.  Should be <= 0 if only a partial recovery
+     * @param assignment the assignment for this container. Should be null if only a partial recovery.
+     * @param localState the local state of the supervisor.  May be null if partial recovery
+     * @param workerId the id of the worker to use.  Must not be null if doing a partial recovery.
+     */
+    public BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+            LocalAssignment assignment,
+            LocalState localState, String workerId) throws IOException {
+        this(type, conf, supervisorId, port, assignment, localState, workerId, null, null, null);
+    }
+    
+    /**
+     * Create a new BasicContainer
+     * @param type the type of container being made.
+     * @param conf the supervisor config
+     * @param supervisorId the ID of the supervisor this is a part of.
+     * @param port the port the container is on.  Should be <= 0 if only a partial recovery
+     * @param assignment the assignment for this container. Should be null if only a partial recovery.
+     * @param localState the local state of the supervisor.  May be null if partial recovery
+     * @param workerId the id of the worker to use.  Must not be null if doing a partial recovery.
+     * @param ops file system operations (mostly for testing) if null a new one is made
+     * @param topoConf the config of the topology (mostly for testing) if null 
+     * and not a partial recovery the real conf is read.
+     * @param profileCmd the command to use when profiling (used for testing)
+     * @throws IOException on any error
+     * @throws ContainerRecoveryException if the Container could not be recovered.
+     */
+    BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port,
+            LocalAssignment assignment,
+            LocalState localState, String workerId, Map<String, Object> topoConf, 
+            AdvancedFSOps ops, String profileCmd) throws IOException {
+        super(type, conf, supervisorId, port, assignment, workerId, topoConf, ops);
+        assert(localState != null);
+        _localState = localState;
+
+        if (type.isRecovery() && !type.isOnlyKillable()) {
+            synchronized (localState) {
+                String wid = null;
+                Map<String, Integer> workerToPort = localState.getApprovedWorkers();
+                for (Map.Entry<String, Integer> entry : workerToPort.entrySet()) {
+                    if (port == entry.getValue().intValue()) {
+                        wid = entry.getKey();
+                    }
+                }
+                if (wid == null) {
+                    throw new ContainerRecoveryException("Could not find worker id for " + port + " " + assignment);
+                }
+                LOG.info("Recovered Worker {}", wid);
+                _workerId = wid;
+            }
+        } else if (_workerId == null){
+            createNewWorkerId();
+        }
+
+        if (profileCmd == null) {
+            profileCmd = _stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR
+                    + conf.get(Config.WORKER_PROFILER_COMMAND);
+        }
+        _profileCmd = profileCmd;
+    }
+
+    /**
+     * Create a new worker ID for this process and store in in this object and
+     * in the local state.  Never call this if a worker is currently up and running.
+     * We will lose track of the process.
+     */
+    protected void createNewWorkerId() {
+        _type.assertFull();
+        assert(_workerId == null);
+        synchronized (_localState) {
+            _workerId = Utils.uuid();
+            Map<String, Integer> workerToPort = _localState.getApprovedWorkers();
+            if (workerToPort == null) {
+                workerToPort = new HashMap<>(1);
+            }
+            removeWorkersOn(workerToPort, _port);
+            workerToPort.put(_workerId, _port);
+            _localState.setApprovedWorkers(workerToPort);
+            LOG.info("Created Worker ID {}", _workerId);
+        }
+    }
+
+    private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) {
+        for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext();) {
+            Entry<String, Integer> found = i.next();
+            if (_port == found.getValue().intValue()) {
+                LOG.warn("Deleting worker {} from state", found.getKey());
+                i.remove();
+            }
+        }
+    }
+
+    @Override
+    public void cleanUpForRestart() throws IOException {
+        String origWorkerId = _workerId;
+        super.cleanUpForRestart();
+        synchronized (_localState) {
+            Map<String, Integer> workersToPort = _localState.getApprovedWorkers();
+            workersToPort.remove(origWorkerId);
+            removeWorkersOn(workersToPort, _port);
+            _localState.setApprovedWorkers(workersToPort);
+            LOG.info("Removed Worker ID {}", origWorkerId);
+        }
+    }
+
+    @Override
+    public void relaunch() throws IOException {
+        _type.assertFull();
+        //We are launching it now...
+        _type = ContainerType.LAUNCH;
+        createNewWorkerId();
+        setup();
+        launch();
+    }
+
+    @Override
+    public boolean didMainProcessExit() {
+        return _exitedEarly;
+    }
+
+    /**
+     * Run the given command for profiling
+     * 
+     * @param command
+     *            the command to run
+     * @param env
+     *            the environment to run the command
+     * @param logPrefix
+     *            the prefix to include in the logs
+     * @param targetDir
+     *            the working directory to run the command in
+     * @return true if it ran successfully, else false
+     * @throws IOException
+     *             on any error
+     * @throws InterruptedException
+     *             if interrupted wile waiting for the process to exit.
+     */
+    protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
+            File targetDir) throws IOException, InterruptedException {
+        _type.assertFull();
+        Process p = SupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir);
+        int ret = p.waitFor();
+        return ret == 0;
+    }
+
+    @Override
+    public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException {
+        _type.assertFull();
+        String targetDir = ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port);
+
+        @SuppressWarnings("unchecked")
+        Map<String, String> env = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
+        if (env == null) {
+            env = new HashMap<String, String>();
+        }
+
+        String str = ConfigUtils.workerArtifactsPidPath(_conf, _topologyId, _port);
+
+        String workerPid = _ops.slurpString(new File(str)).trim();
+
+        ProfileAction profileAction = request.get_action();
+        String logPrefix = "ProfilerAction process " + _topologyId + ":" + _port + " PROFILER_ACTION: " + profileAction
+                + " ";
+
+        List<String> command = mkProfileCommand(profileAction, stop, workerPid, targetDir);
+
+        File targetFile = new File(targetDir);
+        if (command.size() > 0) {
+            return runProfilingCommand(command, env, logPrefix, targetFile);
+        }
+        LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", request);
+        return true;
+    }
+
+    /**
+     * Get the command to run when doing profiling
+     * @param action the profiling action to perform
+     * @param stop if this is meant to stop the profiling or start it
+     * @param workerPid the PID of the process to profile
+     * @param targetDir the current working directory of the worker process
+     * @return the command to run for profiling.
+     */
+    private List<String> mkProfileCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) {
+        switch(action) {
+            case JMAP_DUMP:
+                return jmapDumpCmd(workerPid, targetDir);
+            case JSTACK_DUMP:
+                return jstackDumpCmd(workerPid, targetDir);
+            case JPROFILE_DUMP:
+                return jprofileDump(workerPid, targetDir);
+            case JVM_RESTART:
+                return jprofileJvmRestart(workerPid);
+            case JPROFILE_STOP:
+                if (stop) {
+                    return jprofileStop(workerPid, targetDir);
+                }
+                return jprofileStart(workerPid);
+            default:
+                return Lists.newArrayList();
+        }
+    }
+
+    private List<String> jmapDumpCmd(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "jmap", targetDir);
+    }
+
+    private List<String> jstackDumpCmd(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "jstack", targetDir);
+    }
+
+    private List<String> jprofileStart(String pid) {
+        return Lists.newArrayList(_profileCmd, pid, "start");
+    }
+
+    private List<String> jprofileStop(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "stop", targetDir);
+    }
+
+    private List<String> jprofileDump(String pid, String targetDir) {
+        return Lists.newArrayList(_profileCmd, pid, "dump", targetDir);
+    }
+
+    private List<String> jprofileJvmRestart(String pid) {
+        return Lists.newArrayList(_profileCmd, pid, "kill");
+    }
+
+    /**
+     * Compute the java.library.path that should be used for the worker.
+     * This helps it to load JNI libraries that are packaged in the uber jar.
+     * @param stormRoot the root directory of the worker process
+     * @param conf the config for the supervisor.
+     * @return the java.library.path/LD_LIBRARY_PATH to use so native libraries load correctly.
+     */
+    protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) {
+        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR;
+        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+        String arch = System.getProperty("os.arch");
+        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch;
+        String ret = CPJ.join(archResourceRoot, resourceRoot,
+                conf.get(Config.JAVA_LIBRARY_PATH));
+        return ret;
+    }
+
+    /**
+     * Returns a collection of jar file names found under the given directory.
+     * @param dir the directory to search
+     * @return the jar file names
+     */
+    protected List<String> getFullJars(File dir) {
+        File[] files = dir.listFiles(jarFilter);
+
+        if (files == null) {
+            return Collections.emptyList();
+        }
+        ArrayList<String> ret = new ArrayList<>(files.length);
+        for (File f: files) {
+            ret.add(f.getAbsolutePath());
+        }
+        return ret;
+    }
+    
+    protected List<String> frameworkClasspath() {
+        File stormLibDir = new File(_stormHome, "lib");
+        String stormConfDir =
+                System.getenv("STORM_CONF_DIR") != null ?
+                System.getenv("STORM_CONF_DIR") :
+                new File(_stormHome, "conf").getAbsolutePath();
+        File stormExtlibDir = new File(_stormHome, "extlib");
+        String extcp = System.getenv("STORM_EXT_CLASSPATH");
+        List<String> pathElements = new LinkedList<>();
+        pathElements.addAll(getFullJars(stormLibDir));
+        pathElements.addAll(getFullJars(stormExtlibDir));
+        pathElements.add(extcp);
+        pathElements.add(stormConfDir);
+
+        return pathElements;
+    }
+    
+    @SuppressWarnings("unchecked")
+    private List<String> asStringList(Object o) {
+        if (o instanceof String) {
+            return Arrays.asList((String)o);
+        } else if (o instanceof List) {
+            return (List<String>)o;
+        }
+        return Collections.EMPTY_LIST;
+    }
+    
+    /**
+     * Compute the classpath for the worker process
+     * @param stormJar the topology jar
+     * @param dependencyLocations any dependencies from the topology
+     * @return the full classpath
+     */
+    protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations) {
+        List<String> workercp = new ArrayList<>();
+        workercp.addAll(frameworkClasspath());
+        workercp.add(stormJar);
+        workercp.addAll(dependencyLocations);
+        workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH)));
+        return CPJ.join(workercp);
+    }
+
+    private String substituteChildOptsInternal(String string, int memOnheap) {
+        if (StringUtils.isNotBlank(string)) {
+            String p = String.valueOf(_port);
+            string = string.replace("%ID%", p);
+            string = string.replace("%WORKER-ID%", _workerId);
+            string = string.replace("%TOPOLOGY-ID%", _topologyId);
+            string = string.replace("%WORKER-PORT%", p);
+            if (memOnheap > 0) {
+                string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+            }
+        }
+        return string;
+    }
+    
+    protected List<String> substituteChildopts(Object value) {
+        return substituteChildopts(value, -1);
+    }
+
+    protected List<String> substituteChildopts(Object value, int memOnheap) {
+        List<String> rets = new ArrayList<>();
+        if (value instanceof String) {
+            String string = substituteChildOptsInternal((String) value, memOnheap);
+            if (StringUtils.isNotBlank(string)) {
+                String[] strings = string.split("\\s+");
+                for (String s: strings) {
+                    if (StringUtils.isNotBlank(s)) {
+                        rets.add(s);
+                    }
+                }
+            }
+        } else if (value instanceof List) {
+            @SuppressWarnings("unchecked")
+            List<String> objects = (List<String>) value;
+            for (String object : objects) {
+                String str = substituteChildOptsInternal(object, memOnheap);
+                if (StringUtils.isNotBlank(str)) {
+                    rets.add(str);
+                }
+            }
+        }
+        return rets;
+    }
+
+    /**
+     * Launch the worker process (non-blocking)
+     * 
+     * @param command
+     *            the command to run
+     * @param env
+     *            the environment to run the command
+     * @param processExitcallback
+     *            a callback for when the process exits
+     * @param logPrefix
+     *            the prefix to include in the logs
+     * @param targetDir
+     *            the working directory to run the command in
+     * @return true if it ran successfully, else false
+     * @throws IOException
+     *             on any error
+     */
+    protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
+            ExitCodeCallback processExitCallback, File targetDir) throws IOException {
+        SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
+    }
+
+    private String getWorkerLoggingConfigFile() {
+        String log4jConfigurationDir = (String) (_conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+        if (StringUtils.isNotBlank(log4jConfigurationDir)) {
+            if (!Utils.isAbsolutePath(log4jConfigurationDir)) {
+                log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + log4jConfigurationDir;
+            }
+        } else {
+            log4jConfigurationDir = _stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2";
+        }
+ 
+        if (Utils.IS_ON_WINDOWS && !log4jConfigurationDir.startsWith("file:")) {
+            log4jConfigurationDir = "file:///" + log4jConfigurationDir;
+        }
+        return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
+    }
+    
+    private static class DependencyLocations {
+        private List<String> _data = null;
+        private final Map<String, Object> _conf;
+        private final String _topologyId;
+        private final AdvancedFSOps _ops;
+        private final String _stormRoot;
+        
+        public DependencyLocations(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) {
+            _conf = conf;
+            _topologyId = topologyId;
+            _ops = ops;
+            _stormRoot = stormRoot;
+        }
+        
+        public String toString() {
+            List<String> data;
+            synchronized(this) {
+                data = _data;
+            }
+            return "DEP_LOCS for " + _topologyId +" => " + data;
+        }
+        
+        public synchronized List<String> get() throws IOException {
+            if (_data != null) {
+                return _data;
+            }
+            final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
+            final List<String> dependencyLocations = new ArrayList<>();
+            if (stormTopology.get_dependency_jars() != null) {
+                for (String dependency : stormTopology.get_dependency_jars()) {
+                    dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath());
+                }
+            }
+
+            if (stormTopology.get_dependency_artifacts() != null) {
+                for (String dependency : stormTopology.get_dependency_artifacts()) {
+                    dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath());
+                }
+            }
+            _data = dependencyLocations;
+            return _data;
+        }
+    }
+
+    static class DepLRUCache {
+        public final int _maxSize = 100; //We could make this configurable in the future...
+        
+        @SuppressWarnings("serial")
+        private LinkedHashMap<String, DependencyLocations> _cache = new LinkedHashMap<String, DependencyLocations>() {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String,DependencyLocations> eldest) {
+                return (size() > _maxSize);
+            }
+        };
+        
+        public synchronized DependencyLocations get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
+            //Only go off of the topology id for now.
+            DependencyLocations dl = _cache.get(topologyId);
+            if (dl == null) {
+                _cache.put(topologyId, new DependencyLocations(conf, topologyId, ops, stormRoot));
+                dl = _cache.get(topologyId);
+            }
+            return dl;
+        }
+        
+        public synchronized void clear() {
+            _cache.clear();
+        }
+    }
+    
+    static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache();
+    
+    public static List<String> getDependencyLocationsFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException {
+        return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get();
+    }
+    
+    /**
+     * Get parameters for the class path of the worker process.  Also used by the
+     * log Writer
+     * @param stormRoot the root dist dir for the topology
+     * @return the classpath for the topology as command line arguments.
+     * @throws IOException on any error.
+     */
+    private List<String> getClassPathParams(final String stormRoot) throws IOException {
+        final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+        final List<String> dependencyLocations = getDependencyLocationsFor(_conf, _topologyId, _ops, stormRoot);
+        final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations);
+        
+        List<String> classPathParams = new ArrayList<>();
+        classPathParams.add("-cp");
+        classPathParams.add(workerClassPath);
+        return classPathParams;
+    }
+    
+    /**
+     * Get a set of java properties that are common to both the log writer and the worker processes.
+     * These are mostly system properties that are used by logging.
+     * @return a list of command line options
+     */
+    private List<String> getCommonParams() {
+        final String workersArtifacts = ConfigUtils.workerArtifactsRoot(_conf);
+        String stormLogDir = ConfigUtils.getLogDir();
+        String log4jConfigurationFile = getWorkerLoggingConfigFile();
+        
+        List<String> commonParams = new ArrayList<>();
+        commonParams.add("-Dlogging.sensitivity=" + OR((String) _topoConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY), "S3"));
+        commonParams.add("-Dlogfile.name=worker.log");
+        commonParams.add("-Dstorm.home=" + OR(_stormHome, ""));
+        commonParams.add("-Dworkers.artifacts=" + workersArtifacts);
+        commonParams.add("-Dstorm.id=" + _topologyId);
+        commonParams.add("-Dworker.id=" + _workerId);
+        commonParams.add("-Dworker.port=" + _port);
+        commonParams.add("-Dstorm.log.dir=" + stormLogDir);
+        commonParams.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+        commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+        commonParams.add("-Dstorm.local.dir=" + _conf.get(Config.STORM_LOCAL_DIR));
+        return commonParams;
+    }
+    
+    private int getMemOnHeap(WorkerResources resources) {
+        int memOnheap = 0;
+        if (resources != null && resources.is_set_mem_on_heap() && 
+                resources.get_mem_on_heap() > 0) {
+            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+        } else {
+            // set the default heap memory size for supervisor-test
+            memOnheap = Utils.getInt(_topoConf.get(Config.WORKER_HEAP_MEMORY_MB), 768);
+        }
+        return memOnheap;
+    }
+    
+    private List<String> getWorkerProfilerChildOpts(int memOnheap) {
+        List<String> workerProfilerChildopts = new ArrayList<>();
+        if (Utils.getBoolean(_conf.get(Config.WORKER_PROFILER_ENABLED), false)) {
+            workerProfilerChildopts = substituteChildopts(_conf.get(Config.WORKER_PROFILER_CHILDOPTS), memOnheap);
+        }
+        return workerProfilerChildopts;
+    }
+    
+    /**
+     * a or b the first one that is not null
+     * @param a something
+     * @param b something else
+     * @return a or b the first one that is not null
+     */
+    private <V> V OR(V a, V b) {
+        return a == null ? b : a;
+    }
+    
+    protected String javaCmd(String cmd) {
+        String ret = null;
+        String javaHome = System.getenv().get("JAVA_HOME");
+        if (StringUtils.isNotBlank(javaHome)) {
+            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd;
+        } else {
+            ret = cmd;
+        }
+        return ret;
+    }
+    
+    /**
+     * Create the command to launch the worker process
+     * @param memOnheap the on heap memory for the worker
+     * @param stormRoot the root dist dir for the topology
+     * @param jlp java library path for the topology
+     * @return the command to run
+     * @throws IOException on any error.
+     */
+    private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
+            final String jlp) throws IOException {
+        final String javaCmd = javaCmd("java");
+        final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+        final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+        final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
+        
+        List<String> classPathParams = getClassPathParams(stormRoot);
+        List<String> commonParams = getCommonParams();
+        
+        List<String> commandList = new ArrayList<>();
+        //Log Writer Command...
+        commandList.add(javaCmd);
+        commandList.addAll(classPathParams);
+        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
+        commandList.addAll(commonParams);
+        commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.
+
+        //Worker Command...
+        commandList.add(javaCmd);
+        commandList.add("-server");
+        commandList.addAll(commonParams);
+        commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap));
+        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
+        commandList.addAll(substituteChildopts(OR(
+                _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
+                _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
+        commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
+        commandList.add("-Djava.library.path=" + jlp);
+        commandList.add("-Dstorm.conf.file=" + stormConfFile);
+        commandList.add("-Dstorm.options=" + stormOptions);
+        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
+        commandList.addAll(classPathParams);
+        commandList.add("org.apache.storm.daemon.worker");
+        commandList.add(_topologyId);
+        commandList.add(_supervisorId);
+        commandList.add(String.valueOf(_port));
+        commandList.add(_workerId);
+        
+        return commandList;
+    }
+
+    @Override
+    public void launch() throws IOException {
+        _type.assertFull();
+        LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
+                _supervisorId, _port, _workerId);
+        String logPrefix = "Worker Process " + _workerId;
+        ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
+        _exitedEarly = false;
+        
+        final WorkerResources resources = _assignment.get_resources();
+        final int memOnheap = getMemOnHeap(resources);
+        final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
+        final String jlp = javaLibraryPath(stormRoot, _conf);
+        
+        List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp);
+
+        Map<String, String> topEnvironment = new HashMap<String, String>();
+        @SuppressWarnings("unchecked")
+        Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
+        if (environment != null) {
+            topEnvironment.putAll(environment);
+        }
+        topEnvironment.put("LD_LIBRARY_PATH", jlp);
+
+        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
+
+        String workerDir = ConfigUtils.workerRoot(_conf, _workerId);
+
+        launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
new file mode 100644
index 0000000..7dacf14
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+/**
+ * Launch containers with no security using standard java commands
+ */
+public class BasicContainerLauncher extends ContainerLauncher {
+    private final Map<String, Object> _conf;
+    private final String _supervisorId;
+    
+    public BasicContainerLauncher(Map<String, Object> conf, String supervisorId) throws IOException {
+        _conf = conf;
+        _supervisorId = supervisorId;
+    }
+
+    @Override
+    public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        Container container = new BasicContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment,
+                state, null);
+        container.setup();
+        container.launch();
+        return container;
+    }
+
+    @Override
+    public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
+        return new BasicContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment,
+                state, null);
+    }
+
+    @Override
+    public Killable recoverContainer(String workerId, LocalState localState) throws IOException {
+        return new BasicContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null,
+                    localState, workerId);
+    }
+}


Mime
View raw message