incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [2/2] git commit: Improve application and platform configuration
Date Thu, 17 Jan 2013 15:49:23 GMT
Updated Branches:
  refs/heads/S4-59 [created] d3b7c30ab


Improve application and platform configuration

- we now have 3 layers:
1/ base layer. It simply registers the node in the logical cluster,
and monitors application configuration for the cluster
2/ platform layer. Based on the application configuration, it loads custom defined
modules, fetching them if necessary, then starts a Server with distributed app loader,
or starts an app directly
3/ app layer. Loads app code, fetching it remotely if necessary, and starts the app

- these layers use a hierarchy of classloaders (loading from modules jars and app S4R)
and of Guice injectors

- also fixed -clean parameter in zkServer tool (with arity of 0, specifying -clean means
cleaning, not specifying it means not cleaning)


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d3b7c30a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d3b7c30a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d3b7c30a

Branch: refs/heads/S4-59
Commit: d3b7c30ab5999fc55f3d4c1b4cd92e36faa09f14
Parents: d9e1599
Author: Matthieu Morel <mmorel@apache.org>
Authored: Tue Jan 15 20:18:05 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Jan 17 16:43:49 2013 +0100

----------------------------------------------------------------------
 subprojects/s4-base/s4-base.gradle                 |    2 +-
 .../org/apache/s4/base/util/ModulesLoader.java     |   16 +
 .../org/apache/s4/base/util/S4RLoaderFactory.java  |    4 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |   28 +--
 .../org/apache/s4/comm/ModulesLoaderFactory.java   |  112 +++++++
 .../apache/s4/comm/util/ArchiveFetchException.java |   14 +
 .../org/apache/s4/comm/util/ArchiveFetcher.java    |   42 +++
 .../s4/comm/util/FileSystemArchiveFetcher.java     |   41 +++
 .../apache/s4/comm/util/HttpArchiveFetcher.java    |  187 ++++++++++++
 .../org/apache/s4/comm/util/RemoteFileFetcher.java |   23 ++
 .../src/main/resources/default.s4.comm.properties  |    3 -
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |    7 +-
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |    6 +-
 .../org/apache/s4/fixtures/TestCommModule.java     |   38 +++
 .../main/java/org/apache/s4/core/BaseModule.java   |   80 +++++
 .../java/org/apache/s4/core/DefaultCoreModule.java |   15 +
 .../src/main/java/org/apache/s4/core/Main.java     |  224 --------------
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |  228 +++++++++++++++
 .../src/main/java/org/apache/s4/core/S4Node.java   |   79 +++++
 .../src/main/java/org/apache/s4/core/Server.java   |    2 +-
 .../s4/core/ft/DefaultFileSystemStateStorage.java  |   22 +--
 .../ft/FileSystemBackendCheckpointingModule.java   |   30 ++-
 .../java/org/apache/s4/core/util/AppConfig.java    |  159 ++++++++++
 .../s4/core/util/ParametersInjectionModule.java    |    3 +-
 .../java/org/apache/s4/deploy/DeploymentUtils.java |   38 +++
 .../s4/deploy/DistributedDeploymentManager.java    |   63 +++--
 .../org/apache/s4/deploy/FileSystemS4RFetcher.java |   41 ---
 .../java/org/apache/s4/deploy/HttpS4RFetcher.java  |  186 ------------
 .../main/java/org/apache/s4/deploy/S4RFetcher.java |   42 ---
 .../org/apache/s4/core/ft/CheckpointingTest.java   |    4 +
 .../org/apache/s4/core/ft/FTWordCountTest.java     |   23 +-
 ...ndWithZKStorageCallbackCheckpointingModule.java |    8 +-
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |   31 ++-
 .../s4/core/moduleloader/TestModuleLoader.java     |  131 +++++++++
 .../core/moduleloader/TestModuleLoaderRemote.java  |   12 +
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   16 +-
 .../s4/deploy/prodcon/TestProducerConsumer.java    |   17 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |    9 +-
 .../org/apache/s4/wordcount/WordClassifierPE.java  |    2 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   36 +--
 .../src/test/java/org/apache/s4/edsl/TestEDSL.java |    9 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |   74 ++++--
 .../src/main/java/org/apache/s4/tools/Tools.java   |    6 +-
 .../main/java/org/apache/s4/tools/ZKServer.java    |    2 +-
 44 files changed, 1452 insertions(+), 663 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-base/s4-base.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/s4-base.gradle b/subprojects/s4-base/s4-base.gradle
index fffe198..e38ac28 100644
--- a/subprojects/s4-base/s4-base.gradle
+++ b/subprojects/s4-base/s4-base.gradle
@@ -19,5 +19,5 @@
 
 description = 'Interfaces and most basic classes required by nultiple modules.'
  
-dependencies {
+dependencies {
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java
new file mode 100644
index 0000000..6dab438
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java
@@ -0,0 +1,16 @@
+package org.apache.s4.base.util;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * A classloader for loading module classes from a list of URLs, typically locally copied/extracted files.
+ * 
+ */
+public class ModulesLoader extends URLClassLoader {
+
+    public ModulesLoader(URL[] urls) {
+        super(urls);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
index d33c3bf..527c57f 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
@@ -48,7 +48,7 @@ public class S4RLoaderFactory {
 
     private static Logger logger = LoggerFactory.getLogger(S4RLoaderFactory.class);
 
-    @Inject(optional = true)
+    @Inject
     @Named("s4.tmp.dir")
     File tmpDir;
 
@@ -69,7 +69,7 @@ public class S4RLoaderFactory {
         File s4rDir = null;
         if (tmpDir == null) {
             s4rDir = Files.createTempDir();
-            s4rDir.deleteOnExit();
+            // s4rDir.deleteOnExit();
             logger.warn(
                     "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
                     s4rDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index d7c8cee..2fa3e0e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -20,7 +20,6 @@ package org.apache.s4.comm;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
 
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
@@ -32,8 +31,6 @@ import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterFromZK;
 import org.apache.s4.comm.topology.Clusters;
@@ -55,7 +52,6 @@ public class DefaultCommModule extends AbstractModule {
     private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
     InputStream commConfigInputStream;
     private PropertiesConfiguration config;
-    String clusterName;
 
     /**
      * 
@@ -65,10 +61,9 @@ public class DefaultCommModule extends AbstractModule {
      *            the name of the cluster to which the current node belongs. If specified in the configuration file,
      *            this parameter will be ignored.
      */
-    public DefaultCommModule(InputStream commConfigInputStream, String clusterName) {
+    public DefaultCommModule(InputStream commConfigInputStream) {
         super();
         this.commConfigInputStream = commConfigInputStream;
-        this.clusterName = clusterName;
     }
 
     @SuppressWarnings("unchecked")
@@ -89,10 +84,11 @@ public class DefaultCommModule extends AbstractModule {
         /* Use Kryo to serialize events. */
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
-        // a node holds a single partition assignment
-        // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
-        bind(Assignment.class).to(AssignmentFromZK.class);
+        // // a node holds a single partition assignment
+        // // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
+        // bind(Assignment.class).to(AssignmentFromZK.class);
         bind(Cluster.class).to(ClusterFromZK.class);
+        // bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
 
         bind(Clusters.class).to(ClustersFromZK.class);
 
@@ -128,20 +124,6 @@ public class DefaultCommModule extends AbstractModule {
             /* Make all properties injectable. Do we need this? */
             Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
 
-            if (clusterName != null) {
-                if (config.containsKey("s4.cluster.name")) {
-                    logger.warn(
-                            "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
-                            clusterName, config.getProperty("s4.cluster.name"));
-                } else {
-                    Names.bindProperties(binder, new HashMap<String, String>() {
-                        {
-                            put("s4.cluster.name", clusterName);
-                        }
-                    });
-                }
-            }
-
         } catch (ConfigurationException e) {
             binder.addError(e);
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
new file mode 100644
index 0000000..2b89039
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
@@ -0,0 +1,112 @@
+package org.apache.s4.comm;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import org.apache.s4.base.util.ModulesLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ModulesLoaderFactory {
+
+    private static Logger logger = LoggerFactory.getLogger(ModulesLoaderFactory.class);
+
+    @Inject
+    @Named("s4.tmp.dir")
+    File tmpDir;
+
+    /**
+     * Explodes the jar archive in a subdirectory of a user specified directory through "s4.tmp.dir" parameter, and
+     * prepares a classloader that will load classes and resources from, first, the application classes, then the
+     * dependencies.
+     * 
+     * Inspired from Hadoop's application classloading implementation (RunJar class).
+     * 
+     * @param modulesJarPath
+     *            path to s4r
+     * @return classloader that loads resources from the archive in a predefined order
+     */
+    public ModulesLoader createModulesLoader(Iterable<File> modulesFiles) {
+        List<URL> classpath = new ArrayList<URL>();
+        for (File moduleFile : modulesFiles) {
+            addModuleToClasspath(moduleFile, classpath);
+        }
+        return new ModulesLoader(classpath.toArray(new URL[] {}));
+
+    }
+
+    private void addModuleToClasspath(File moduleFile, List<URL> classpath) {
+
+        File moduleDir = null;
+        if (tmpDir == null) {
+            moduleDir = Files.createTempDir();
+            moduleDir.deleteOnExit();
+            logger.warn(
+                    "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+                    moduleDir.getAbsolutePath());
+        } else {
+            moduleDir = new File(tmpDir, moduleFile.getName() + "-" + System.currentTimeMillis());
+            if (!moduleDir.mkdir()) {
+                throw new RuntimeException("Cannot create directory for unzipping S4R file in ["
+                        + moduleDir.getAbsolutePath() + "]. Aborting deployment.");
+            }
+        }
+        logger.info("Unzipping S4R archive in [{}]", moduleDir.getAbsolutePath());
+
+        JarFile jar = null;
+        try {
+            jar = new JarFile(moduleFile);
+            Enumeration<JarEntry> entries = jar.entries();
+            while (entries.hasMoreElements()) {
+                JarEntry entry = entries.nextElement();
+                if (!entry.isDirectory()) {
+                    File to = new File(moduleDir, entry.getName());
+                    Files.createParentDirs(to);
+                    InputStream is = jar.getInputStream(entry);
+                    OutputStream os = new FileOutputStream(to);
+                    try {
+                        ByteStreams.copy(is, os);
+                    } finally {
+                        Closeables.closeQuietly(is);
+                        Closeables.closeQuietly(os);
+                    }
+                }
+            }
+
+            classpath.add(moduleDir.toURI().toURL());
+            addDirLibsToClassPath(classpath, moduleDir, "/lib");
+
+        } catch (IOException e) {
+            logger.error("Cannot process S4R [{}]: {}", moduleFile.getAbsolutePath(),
+                    e.getClass().getName() + "/" + e.getMessage());
+            throw new RuntimeException("Cannot create S4R classloader", e);
+        }
+    }
+
+    private void addDirLibsToClassPath(List<URL> classpath, File s4rDir, String dir) throws MalformedURLException {
+        File[] libs = new File(s4rDir, dir).listFiles();
+        if (libs != null) {
+            for (int i = 0; i < libs.length; i++) {
+                if (!libs[i].isDirectory()) {
+                    classpath.add(libs[i].toURI().toURL());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
new file mode 100644
index 0000000..8567cce
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
@@ -0,0 +1,14 @@
+package org.apache.s4.comm.util;
+
+
+public class ArchiveFetchException extends Exception {
+
+    public ArchiveFetchException(String string) {
+        super(string);
+    }
+
+    public ArchiveFetchException(String string, Throwable throwable) {
+        super(string, throwable);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
new file mode 100644
index 0000000..dbd35ba
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.util;
+
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * This interface defines methods to fetch archive files from a URI (S4R or modules jars). Various protocols can be
+ * supported in the implementation classes (e.g. file system, HTTP etc...)
+ * 
+ */
+public interface ArchiveFetcher {
+
+    /**
+     * Returns a stream to an archive file
+     * 
+     * @param uri
+     *            archive identifier
+     * @return an input stream for accessing the content of the archive file
+     * @throws ArchiveFetchException
+     *             when fetching fails
+     */
+    InputStream fetch(URI uri) throws ArchiveFetchException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
new file mode 100644
index 0000000..294877f
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * Fetches modules jar files and application S4R files from a file system, possibly distributed.
+ * 
+ */
+public class FileSystemArchiveFetcher implements ArchiveFetcher {
+
+    @Override
+    public InputStream fetch(URI uri) throws ArchiveFetchException {
+        try {
+            return new FileInputStream(new File(uri));
+        } catch (FileNotFoundException e) {
+            throw new ArchiveFetchException("Cannot retrieve file from uri [" + uri.toString() + "]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
new file mode 100644
index 0000000..4ee28d3
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpClientCodec;
+import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
+
+/**
+ * <p>
+ * Fetches modules and app archives through HTTP.
+ * </p>
+ * <p>
+ * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
+ * 
+ * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
+ *      snoop example</a>
+ * 
+ *      </p>
+ */
+public class HttpArchiveFetcher implements ArchiveFetcher {
+
+    private static Logger logger = LoggerFactory.getLogger(HttpArchiveFetcher.class);
+
+    @Override
+    public InputStream fetch(URI uri) throws ArchiveFetchException {
+        logger.debug("Fetching file through http: {}", uri.toString());
+
+        String host = uri.getHost();
+        int port = uri.getPort();
+        if (port == -1) {
+            if (uri.getScheme().equalsIgnoreCase("http")) {
+                port = 80;
+            } else if (uri.getScheme().equalsIgnoreCase("https")) {
+                port = 443;
+            }
+        }
+
+        ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+        File tmpFile;
+        try {
+            tmpFile = File.createTempFile("http", "download");
+        } catch (IOException e) {
+            throw new ArchiveFetchException("Cannot create temporary file for fetching archive data from http server",
+                    e);
+        }
+        clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
+        ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
+        // TODO timeout?
+        Channel channel = channelFuture.awaitUninterruptibly().getChannel();
+        if (!channelFuture.isSuccess()) {
+            clientBootstrap.releaseExternalResources();
+            throw new ArchiveFetchException("Cannot connect to http uri [" + uri.toString() + "]",
+                    channelFuture.getCause());
+        }
+
+        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
+        request.setHeader(HttpHeaders.Names.HOST, host);
+        request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+        request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+        channel.write(request);
+
+        channel.getCloseFuture().awaitUninterruptibly();
+
+        clientBootstrap.releaseExternalResources();
+
+        logger.debug("Finished downloading archive file through http {}, as file: {}", uri.toString(),
+                tmpFile.getAbsolutePath());
+        try {
+            return new FileInputStream(tmpFile);
+        } catch (FileNotFoundException e) {
+            throw new ArchiveFetchException("Cannot get input stream from temporary file with s4r data ["
+                    + tmpFile.getAbsolutePath() + "]");
+        }
+    }
+
+    private class HttpClientPipelineFactory implements ChannelPipelineFactory {
+
+        File tmpFile;
+
+        public HttpClientPipelineFactory(File tmpFile) {
+            this.tmpFile = tmpFile;
+        }
+
+        @Override
+        public ChannelPipeline getPipeline() throws Exception {
+            // Create a default pipeline implementation.
+            ChannelPipeline pipeline = Channels.pipeline();
+
+            pipeline.addLast("codec", new HttpClientCodec());
+
+            // Remove the following line if you don't want automatic content decompression.
+            pipeline.addLast("inflater", new HttpContentDecompressor());
+
+            pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
+            return pipeline;
+        }
+    }
+
+    // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
+    private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+
+        private boolean readingChunks;
+        FileOutputStream fos;
+
+        public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
+            this.fos = new FileOutputStream(tmpFile);
+        }
+
+        @Override
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+            if (!readingChunks) {
+                HttpResponse response = (HttpResponse) e.getMessage();
+
+                if (response.isChunked()) {
+                    readingChunks = true;
+                } else {
+                    copyContentToTmpFile(response.getContent());
+                }
+            } else {
+                HttpChunk chunk = (HttpChunk) e.getMessage();
+                if (chunk.isLast()) {
+                    readingChunks = false;
+                    fos.close();
+                } else {
+                    copyContentToTmpFile(chunk.getContent());
+                }
+            }
+
+        }
+
+        private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
+            ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
+            ByteStreams.copy(cbis, fos);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
new file mode 100644
index 0000000..f81b81b
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.util;
+
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * Factory for remote file fetchers depending on the access protocol.
+ * 
+ */
+public class RemoteFileFetcher implements ArchiveFetcher {
+
+    @Override
+    public InputStream fetch(URI uri) throws ArchiveFetchException {
+        String scheme = uri.getScheme();
+        if ("file".equalsIgnoreCase(scheme)) {
+            return new FileSystemArchiveFetcher().fetch(uri);
+        }
+        if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
+            return new HttpArchiveFetcher().fetch(uri);
+        }
+        throw new ArchiveFetchException("Unsupported protocol " + scheme);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 36417b4..bef0894 100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -3,6 +3,3 @@ s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
 s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener
 # I/O channel connection timeout, when applicable (e.g. used by netty)
 s4.comm.timeout=1000
-s4.cluster.zk_address = localhost:2181
-s4.cluster.zk_session_timeout = 10000
-s4.cluster.zk_connection_timeout = 10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index c9153ce..1ffae1d 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -20,9 +20,9 @@ package org.apache.s4.comm.tcp;
 
 import java.io.IOException;
 
-import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.DeliveryTestUtil;
 import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.TestCommModule;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +45,11 @@ public abstract class TCPCommTest extends ProtocolTestUtil {
         super(numTasks);
     }
 
+    @Override
     public Injector newInjector() {
         try {
-            return Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                    .openStream(), CLUSTER_NAME));
+            return Guice.createInjector(new TestCommModule(Resources.getResource("default.s4.comm.properties")
+                    .openStream()));
         } catch (IOException e) {
             Assert.fail();
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 0f6004f..543efec 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -20,9 +20,9 @@ package org.apache.s4.comm.udp;
 
 import java.io.IOException;
 
-import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.DeliveryTestUtil;
 import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.TestCommModule;
 import org.junit.Assert;
 
 import com.google.common.io.Resources;
@@ -44,8 +44,8 @@ public abstract class UDPCommTest extends ProtocolTestUtil {
 
     @Override
     protected Injector newInjector() throws IOException {
-        return Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(),
-                "cluster1"), new UDPCommTestModule());
+        return Guice.createInjector(new TestCommModule(Resources.getResource("udp.s4.comm.properties").openStream()),
+                new UDPCommTestModule());
     }
 
     class UDPCommTestModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
new file mode 100644
index 0000000..c1fb253
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -0,0 +1,38 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.comm.util.RemoteFileFetcher;
+
+import com.google.inject.name.Names;
+
+/**
+ * Binds dependencies that come for the base layer and are defined in {@link BaseModule} in s4-core.
+ * 
+ * We need them injected for the tests to work, in particular for getting an assignment
+ * 
+ * 
+ */
+public class TestCommModule extends DefaultCommModule {
+
+    public TestCommModule(InputStream commConfigInputStream) {
+        super(commConfigInputStream);
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        bind(String.class).annotatedWith(Names.named("s4.cluster.name")).toInstance("cluster1");
+        bind(String.class).annotatedWith(Names.named("s4.cluster.zk_address")).toInstance("localhost:2181");
+        bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
+        bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
+        bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+        // bind(Cluster.class).to(ClusterFromZK.class);
+
+        bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
new file mode 100644
index 0000000..df2d8f1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -0,0 +1,80 @@
+package org.apache.s4.core;
+
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.comm.util.RemoteFileFetcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class BaseModule extends AbstractModule {
+
+    private static Logger logger = LoggerFactory.getLogger(BaseModule.class);
+
+    private PropertiesConfiguration config;
+    InputStream baseConfigInputStream;
+    String clusterName;
+
+    public BaseModule(InputStream baseConfigInputStream, String clusterName) {
+        super();
+        this.baseConfigInputStream = baseConfigInputStream;
+        this.clusterName = clusterName;
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        // a node holds a single partition assignment
+        // ==> Assignment is a singleton so it shared between base, comm and app layers.
+        // it is eager so that the node is able to join a cluster immediately
+        bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+        // bind(Cluster.class).to(ClusterFromZK.class);
+
+        bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+        bind(S4Bootstrap.class);
+
+    }
+
+    @SuppressWarnings("serial")
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(baseConfigInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+            if (clusterName != null) {
+                if (config.containsKey("s4.cluster.name")) {
+                    logger.warn(
+                            "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
+                            clusterName, config.getProperty("s4.cluster.name"));
+                } else {
+                    Names.bindProperties(binder, new HashMap<String, String>() {
+                        {
+                            put("s4.cluster.name", clusterName);
+                        }
+                    });
+                }
+            }
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..1f07bf6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -18,6 +18,7 @@
 
 package org.apache.s4.core;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -36,8 +37,11 @@ import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
 import com.google.inject.name.Names;
 
 /**
@@ -85,6 +89,17 @@ public class DefaultCoreModule extends AbstractModule {
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
     }
 
+    @Provides
+    @Named("s4.tmp.dir")
+    public File provideTmpDir() {
+        File tmpS4Dir = Files.createTempDir();
+        tmpS4Dir.deleteOnExit();
+        logger.warn(
+                "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+                tmpS4Dir.getAbsolutePath());
+        return tmpS4Dir;
+    }
+
     private void loadProperties(Binder binder) {
         try {
             config = new PropertiesConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
deleted file mode 100644
index fc85219..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.core.util.ParametersInjectionModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.google.inject.util.Modules;
-import com.google.inject.util.Modules.OverriddenModuleBuilder;
-
-/**
- * Bootstrap class for S4. It creates an S4 node.
- * 
- */
-public class Main {
-
-    private static final Logger logger = LoggerFactory.getLogger(Main.class);
-
-    /**
-     * Starts an S4 server.
-     * 
-     * @param args
-     */
-    public static void main(String[] args) {
-
-        MainArgs mainArgs = new MainArgs();
-        JCommander jc = new JCommander(mainArgs);
-
-        try {
-            jc.parse(args);
-        } catch (Exception e) {
-            JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
-            jc.usage();
-            System.exit(1);
-        }
-
-        startNode(mainArgs);
-    }
-
-    private static void startNode(MainArgs mainArgs) {
-        try {
-            Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-
-                @Override
-                public void uncaughtException(Thread t, Throwable e) {
-                    logger.error("Uncaught exception in thread {}", t.getName(), e);
-
-                }
-            });
-            Injector injector;
-            InputStream commConfigFileInputStream;
-            InputStream coreConfigFileInputStream;
-            String commConfigString;
-            if (mainArgs.commConfigFilePath == null) {
-                commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
-                commConfigString = "default.s4.comm.properties from classpath";
-            } else {
-                commConfigFileInputStream = new FileInputStream(new File(mainArgs.commConfigFilePath));
-                commConfigString = mainArgs.commConfigFilePath;
-            }
-
-            String coreConfigString;
-            if (mainArgs.coreConfigFilePath == null) {
-                coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
-                coreConfigString = "default.s4.core.properties from classpath";
-            } else {
-                coreConfigFileInputStream = new FileInputStream(new File(mainArgs.coreConfigFilePath));
-                coreConfigString = mainArgs.coreConfigFilePath;
-            }
-
-            logger.info(
-                    "Initializing S4 node with : \n- comm module class [{}]\n- comm configuration file [{}]\n- core module class [{}]\n- core configuration file[{}]\n- extra modules: {}\n- inline parameters: {}",
-                    new String[] { mainArgs.commModuleClass, commConfigString, mainArgs.coreModuleClass,
-                            coreConfigString, Arrays.toString(mainArgs.extraModulesClasses.toArray(new String[] {})),
-                            Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})) });
-
-            AbstractModule commModule = (AbstractModule) Class.forName(mainArgs.commModuleClass)
-                    .getConstructor(InputStream.class, String.class)
-                    .newInstance(commConfigFileInputStream, mainArgs.clusterName);
-            AbstractModule coreModule = (AbstractModule) Class.forName(mainArgs.coreModuleClass)
-                    .getConstructor(InputStream.class).newInstance(coreConfigFileInputStream);
-
-            List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
-            for (String moduleClass : mainArgs.extraModulesClasses) {
-                extraModules.add((Module) Class.forName(moduleClass).newInstance());
-            }
-            Module combinedModule = Modules.combine(commModule, coreModule);
-            if (extraModules.size() > 0) {
-                OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
-                combinedModule = overridenModuleBuilder.with(extraModules);
-            }
-
-            if (mainArgs.zkConnectionString != null) {
-                mainArgs.extraNamedParameters.add("s4.cluster.zk_address=" + mainArgs.zkConnectionString);
-            }
-
-            if (!mainArgs.extraNamedParameters.isEmpty()) {
-                logger.debug("Adding named parameters for injection : {}",
-                        Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})));
-                Map<String, String> namedParameters = new HashMap<String, String>();
-
-                for (String namedParam : mainArgs.extraNamedParameters) {
-                    namedParameters.put(namedParam.split("[=]")[0].trim(),
-                            namedParam.substring(namedParam.indexOf('=') + 1).trim());
-                }
-                combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
-            }
-
-            injector = Guice.createInjector(combinedModule);
-
-            if (mainArgs.appClass != null) {
-                logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
-                App app = (App) injector.getInstance(Class.forName(mainArgs.appClass));
-                app.init();
-                app.start();
-            } else {
-                logger.info("Starting S4 node. This node will automatically download applications published for the cluster it belongs to");
-                Server server = injector.getInstance(Server.class);
-                try {
-                    server.start(injector);
-                } catch (Exception e) {
-                    logger.error("Failed to start the controller.", e);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Cannot start S4 node", e);
-            System.exit(1);
-        }
-    }
-
-    /**
-     * Defines command parameters.
-     * 
-     */
-    @Parameters(separators = "=")
-    public static class MainArgs {
-
-        @Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
-        String clusterName = null;
-
-        @Parameter(names = "-commModuleClass", description = "configuration module class for the communication layer", required = false)
-        String commModuleClass = DefaultCommModule.class.getName();
-
-        @Parameter(names = "-commConfig", description = "s4 communication layer configuration file", required = false)
-        String commConfigFilePath;
-
-        @Parameter(names = "-coreModuleClass", description = "s4-core configuration module class", required = false)
-        String coreModuleClass = DefaultCoreModule.class.getName();
-
-        @Parameter(names = "-coreConfig", description = "s4 core configuration file", required = false)
-        String coreConfigFilePath = null;
-
-        @Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
-        String appClass = null;
-
-        @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
-        List<String> extraModulesClasses = new ArrayList<String>();
-
-        @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
-        List<String> extraNamedParameters = new ArrayList<String>();
-
-        @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
-        String zkConnectionString;
-
-    }
-
-    /**
-     * Parameters parsing utility.
-     * 
-     */
-    public static class InlineConfigParameterConverter implements IStringConverter<String> {
-
-        @Override
-        public String convert(String arg) {
-            Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
-            logger.info("processing inline configuration parameter {}", arg);
-            Matcher parameterMatcher = parameterPattern.matcher(arg);
-            if (!parameterMatcher.find()) {
-                throw new IllegalArgumentException("Cannot understand parameter " + arg);
-            }
-            return parameterMatcher.group(1);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
new file mode 100644
index 0000000..deb14ad
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -0,0 +1,228 @@
+package org.apache.s4.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.base.util.ModulesLoader;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.ModulesLoaderFactory;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParametersInjectionModule;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Named;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Modules.OverriddenModuleBuilder;
+
+/**
+ * This is the bootstrap for S4 nodes.
+ * <p>
+ * Its roles are to:
+ * <ul>
+ * <li>register within the S4 cluster (and acquire a partition).
+ * <li>look for application deployed on the S4 cluster
+ * </ul>
+ * <p>
+ * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
+ * application code is then downloaded and the app started.
+ * <p>
+ * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
+ * application classes are available in the classpath.
+ * 
+ * 
+ * 
+ */
+public class S4Bootstrap {
+    private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
+
+    private final ZkClient zkClient;
+    private final String appPath;
+    private final AtomicBoolean deployed = new AtomicBoolean(false);
+
+    private final ArchiveFetcher fetcher;
+
+    private Injector parentInjector;
+
+    CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+
+    @Inject
+    public S4Bootstrap(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ArchiveFetcher fetcher) {
+
+        this.fetcher = fetcher;
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+
+        String appDir = "/s4/clusters/" + clusterName + "/app";
+        if (!zkClient.exists(appDir)) {
+            zkClient.create(appDir, null, CreateMode.PERSISTENT);
+        }
+        appPath = appDir + "/s4App";
+        zkClient.subscribeDataChanges(appPath, new AppChangeListener());
+    }
+
+    public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException {
+        this.parentInjector = parentInjector;
+        if (zkClient.exists(appPath)) {
+            if (!deployed.get()) {
+                loadModulesAndStartNode(parentInjector);
+            }
+        }
+
+        signalOneAppLoaded.await();
+    }
+
+    private void loadModulesAndStartNode(final Injector parentInjector) throws ArchiveFetchException {
+
+        final ZNRecord appData = zkClient.readData(appPath);
+        // can be null
+        final AppConfig appConfig = new AppConfig(appData);
+
+        String appName = appData.getSimpleField("name");
+
+        List<File> modulesLocalCopies = new ArrayList<File>();
+
+        for (String uriString : appConfig.getCustomModulesURIs()) {
+            modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString));
+        }
+        final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+
+        Thread t = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                // load Main class through modules classloader and start it
+                S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+                signalOneAppLoaded.countDown();
+            }
+        }, "S4 platform loader");
+        t.start();
+
+    }
+
+    private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
+
+        URI uri;
+        try {
+            uri = new URI(uriString);
+        } catch (URISyntaxException e2) {
+            throw new ArchiveFetchException("Invalid module URI : [" + uriString + "]", e2);
+        }
+        File localModuleFileCopy;
+        try {
+            localModuleFileCopy = File.createTempFile("tmp", "module");
+        } catch (IOException e1) {
+            logger.error(
+                    "Cannot deploy app [{}] because a local copy of the module file could not be initialized due to [{}]",
+                    appName, e1.getClass().getName() + "->" + e1.getMessage());
+            throw new ArchiveFetchException("Cannot deploy application [" + appName + "]", e1);
+        }
+        localModuleFileCopy.deleteOnExit();
+        try {
+            if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localModuleFileCopy)) == 0) {
+                throw new ArchiveFetchException("Cannot copy archive from [" + uri.toString() + "] to ["
+                        + localModuleFileCopy.getAbsolutePath() + "] (nothing was copied)");
+            }
+        } catch (Exception e) {
+            throw new ArchiveFetchException("Cannot deploy application [" + appName + "] from URI [" + uri.toString()
+                    + "] ", e);
+        }
+        return localModuleFileCopy;
+    }
+
+    public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+        try {
+            Injector injector;
+            InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+            InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+
+            logger.info("Initializing S4 app with : {}", appConfig.toString());
+
+            AbstractModule commModule = new DefaultCommModule(commConfigFileInputStream);
+            AbstractModule coreModule = new DefaultCoreModule(coreConfigFileInputStream);
+
+            List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+            for (String moduleClass : appConfig.getCustomModulesNames()) {
+                extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+            }
+            Module combinedModule = Modules.combine(commModule, coreModule);
+            if (extraModules.size() > 0) {
+                OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+                combinedModule = overridenModuleBuilder.with(extraModules);
+            }
+
+            if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+
+                logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+                Map<String, String> namedParameters = new HashMap<String, String>();
+
+                namedParameters.putAll(appConfig.getNamedParameters());
+                combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+            }
+
+            injector = parentInjector.createChildInjector(combinedModule);
+
+            if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
+                logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+                App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+                app.init();
+                app.start();
+            } else {
+                if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
+                    logger.info("S4 node in standby until app class or app URI is specified");
+                }
+                Server server = injector.getInstance(Server.class);
+                server.start(injector);
+            }
+        } catch (Exception e) {
+            logger.error("Cannot start S4 node", e);
+            System.exit(1);
+        }
+    }
+
+    class AppChangeListener implements IZkDataListener {
+
+        @Override
+        public void handleDataChange(String dataPath, Object data) throws Exception {
+            if (!deployed.get()) {
+                loadModulesAndStartNode(parentInjector);
+                deployed.set(true);
+            }
+
+        }
+
+        @Override
+        public void handleDataDeleted(String dataPath) throws Exception {
+            logger.error("Application undeployment is not supported yet");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
new file mode 100644
index 0000000..4bd929b
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -0,0 +1,79 @@
+package org.apache.s4.core;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+/**
+ * Entry point for starting an S4 node. It parses arguments and injects an {@link S4Bootstrap} based on the
+ * {@link BaseModule} minimal configuration.
+ * 
+ */
+public class S4Node {
+
+    private static Logger logger = LoggerFactory.getLogger(S4Node.class);
+
+    public static void main(String[] args) throws InterruptedException, IOException {
+        S4NodeArgs s4Args = new S4NodeArgs();
+        JCommander jc = new JCommander(s4Args);
+
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+            jc.usage();
+            System.exit(1);
+        }
+        startNode(s4Args);
+
+    }
+
+    private static void startNode(S4NodeArgs mainArgs) throws InterruptedException, IOException {
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("Uncaught exception in thread {}", t.getName(), e);
+
+            }
+        });
+
+        Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
+                "default.s4.base.properties").openStream(), mainArgs.clusterName) });
+        S4Bootstrap bootstrap = injector.getInstance(S4Bootstrap.class);
+        try {
+            bootstrap.start(injector);
+        } catch (ArchiveFetchException e1) {
+            logger.error("Cannot fetch module dependencies.", e1);
+        }
+    }
+
+    /**
+     * Defines command parameters.
+     * 
+     */
+    @Parameters(separators = "=")
+    public static class S4NodeArgs {
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true)
+        String clusterName = null;
+
+        @Parameter(names = "-baseConfig", description = "S4 base configuration file", required = false)
+        String baseConfigFilePath = null;
+
+        @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
+        String zkConnectionString;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 02e521a..d94d714 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -59,7 +59,7 @@ public class Server {
     @Inject
     private AssignmentFromZK assignment;
 
-    private ZkClient zkClient;
+    private final ZkClient zkClient;
 
     /**
      *

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
index e5d4f9f..70a38cf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
@@ -45,12 +45,13 @@ import com.google.inject.name.Named;
  * Checkpoints are stored in individual files (1 file = 1 checkpointId) in directories according to the following
  * structure: <code>(storageRootpath)/prototypeId/checkpointId</code>
  * </p>
- *
+ * 
  */
 public class DefaultFileSystemStateStorage implements StateStorage {
 
     private static Logger logger = LoggerFactory.getLogger(DefaultFileSystemStateStorage.class);
-    @Inject(optional = true)
+
+    @Inject
     @Named("s4.checkpointing.filesystem.storageRootPath")
     String storageRootPath;
 
@@ -64,7 +65,6 @@ public class DefaultFileSystemStateStorage implements StateStorage {
      */
     @Inject
     public void init() {
-        checkStorageDir();
     }
 
     @Override
@@ -122,22 +122,6 @@ public class DefaultFileSystemStateStorage implements StateStorage {
         return id;
     }
 
-    public void checkStorageDir() {
-        if (storageRootPath == null) {
-
-            File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
-                    + "storage");
-            storageRootPath = defaultStorageDir.getAbsolutePath();
-            logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
-            if (!defaultStorageDir.exists()) {
-                if (!(defaultStorageDir.mkdirs())) {
-                    logger.error("Storage directory not specified, and cannot create default storage directory : "
-                            + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
-                }
-            }
-        }
-    }
-
     @Override
     public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
         File f = checkpointID2File(key, storageRootPath);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
index ca23c79..5178c97 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -18,16 +18,44 @@
 
 package org.apache.s4.core.ft;
 
+import java.io.File;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
 
 /**
  * Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
- *
+ * 
  */
 public class FileSystemBackendCheckpointingModule extends AbstractModule {
+
+    private static Logger logger = LoggerFactory.getLogger(FileSystemBackendCheckpointingModule.class);
+
     @Override
     protected void configure() {
         bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
         bind(CheckpointingFramework.class).to(SafeKeeper.class);
     }
+
+    @Provides
+    @Named("s4.checkpointing.filesystem.storageRootPath")
+    public String provideStorageRootPath() {
+        File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+                + "storage");
+        String storageRootPath = defaultStorageDir.getAbsolutePath();
+        logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+        if (!defaultStorageDir.exists()) {
+            if (!(defaultStorageDir.mkdirs())) {
+                logger.error("Storage directory not specified, and cannot create default storage directory : "
+                        + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+            }
+        }
+        return storageRootPath;
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
new file mode 100644
index 0000000..9bd09c7
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
@@ -0,0 +1,159 @@
+package org.apache.s4.core.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.s4.comm.topology.ZNRecord;
+
+public class AppConfig {
+
+    public static final String NAMED_PARAMETERS = "namedParams";
+    public static final String APP_CLASS = "appClass";
+    public static final String APP_NAME = "appName";
+    public static final String APP_URI = "S4R_URI";
+    public static final String MODULES_CLASSES = "modulesClasses";
+    public static final String MODULES_URIS = "modulesURIs";
+
+    String appName;
+    String appClassName;
+    List<String> customModulesNames = Collections.emptyList();
+    List<String> customModulesURIs = Collections.emptyList();
+    Map<String, String> namedParameters = Collections.emptyMap();
+    String appURI;
+
+    private AppConfig() {
+    }
+
+    public AppConfig(ZNRecord znRecord) {
+        appName = znRecord.getSimpleField(APP_NAME);
+        appClassName = znRecord.getSimpleField(APP_CLASS);
+        appURI = znRecord.getSimpleField(APP_URI);
+        customModulesNames = znRecord.getListField(MODULES_CLASSES);
+        customModulesURIs = znRecord.getListField(MODULES_URIS);
+        namedParameters = znRecord.getMapField(NAMED_PARAMETERS);
+    }
+
+    public AppConfig(String appName, String appClassName, String appURI, List<String> customModulesNames,
+            List<String> customModulesURIs, Map<String, String> namedParameters) {
+        super();
+        this.appName = appName;
+        this.appClassName = appClassName;
+        this.appURI = appURI;
+        this.customModulesNames = customModulesNames;
+        this.customModulesURIs = customModulesURIs;
+        this.namedParameters = namedParameters;
+    }
+
+    public String getAppName() {
+        return appName;
+    }
+
+    public String getAppClassName() {
+        return appClassName;
+    }
+
+    public String getAppURI() {
+        return appURI;
+    }
+
+    public List<String> getCustomModulesNames() {
+        return customModulesNames;
+    }
+
+    public List<String> getCustomModulesURIs() {
+        return customModulesURIs;
+    }
+
+    public Map<String, String> getNamedParameters() {
+        return namedParameters;
+    }
+
+    public String getNamedParametersAsString() {
+        if (namedParameters == null || namedParameters.isEmpty()) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String, String> param : namedParameters.entrySet()) {
+            sb.append(param.getKey() + "=" + param.getValue() + ",");
+        }
+        return sb.toString();
+    }
+
+    public ZNRecord asZNRecord(String id) {
+        ZNRecord record = new ZNRecord(id);
+        if (appClassName != null) {
+            record.putSimpleField(APP_CLASS, appClassName);
+        }
+        if (appName != null) {
+            record.putSimpleField(APP_NAME, appName);
+        }
+        if (appURI != null) {
+            record.putSimpleField(APP_URI, appURI);
+        }
+        if (customModulesNames != null) {
+            record.putListField(MODULES_CLASSES, customModulesNames);
+        }
+        if (customModulesURIs != null) {
+            record.putListField(MODULES_URIS, customModulesURIs);
+        }
+        if (namedParameters != null) {
+            record.putMapField(NAMED_PARAMETERS, namedParameters);
+        }
+        return record;
+    }
+
+    @Override
+    public String toString() {
+        return "app name: [" + appName + "] \n " + "app class: [" + appClassName + "] \n" + "app URI : [" + appURI
+                + "] \n" + "modules classes : [" + customModulesNames == null ? ""
+                : (Arrays.toString(customModulesNames.toArray(new String[] {}))) + " \n" + "modules URIs ["
+                        + customModulesURIs == null ? ""
+                        : (Arrays.toString(customModulesURIs.toArray(new String[] {}))) + "]";
+    }
+
+    public static class Builder {
+
+        AppConfig config;
+
+        public Builder() {
+            this.config = new AppConfig();
+        }
+
+        public Builder appName(String appName) {
+            config.appName = appName;
+            return this;
+        }
+
+        public Builder appClassName(String appClassName) {
+            config.appClassName = appClassName;
+            return this;
+        }
+
+        public Builder appURI(String appURI) {
+            config.appURI = appURI;
+            return this;
+        }
+
+        public Builder customModulesNames(List<String> customModulesNames) {
+            config.customModulesNames = customModulesNames;
+            return this;
+        }
+
+        public Builder customModulesURIs(List<String> customModulesURIs) {
+            config.customModulesURIs = customModulesURIs;
+            return this;
+        }
+
+        public Builder namedParameters(Map<String, String> namedParameters) {
+            config.namedParameters = namedParameters;
+            return this;
+        }
+
+        public AppConfig build() {
+            return config;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
index a20d3e6..bba20bc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
@@ -24,7 +24,8 @@ import com.google.inject.AbstractModule;
 import com.google.inject.name.Names;
 
 /**
- * Injects String parameters from a map. Used for loading parameters outside of config files.
+ * Injects String parameters from a map. Used for loading parameters outside of config files, typically parameters
+ * passed through the application configuration.
  * 
  */
 public class ParametersInjectionModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
new file mode 100644
index 0000000..cc404af
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
@@ -0,0 +1,38 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeploymentUtils {
+
+    private static Logger logger = LoggerFactory.getLogger(DeploymentUtils.class);
+
+    public static void initAppConfig(AppConfig appConfig, String clusterName, boolean deleteIfExists, String zkString) {
+        ZkClient zk = new ZkClient(zkString);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zk.setZkSerializer(serializer);
+
+        if (zk.exists("/s4/clusters/" + clusterName + "/app/s4App")) {
+            if (deleteIfExists) {
+                zk.delete("/s4/clusters/" + clusterName + "/app/s4App");
+            }
+        }
+        try {
+            zk.create("/s4/clusters/" + clusterName + "/app/s4App", appConfig.asZNRecord("app"), CreateMode.PERSISTENT);
+        } catch (ZkNodeExistsException e) {
+            if (!deleteIfExists) {
+                logger.warn("Node {} already exists, will not overwrite", "/s4/clusters/" + clusterName + "/app/s4App");
+            } else {
+                throw new RuntimeException("Node should have been deleted");
+            }
+        }
+        zk.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 9fc5c53..fc05c48 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -20,7 +20,6 @@ package org.apache.s4.deploy;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 
@@ -28,8 +27,10 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Server;
+import org.apache.s4.core.util.AppConfig;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,14 +79,17 @@ public class DistributedDeploymentManager implements DeploymentManager {
     private final Server server;
     boolean deployed = false;
 
+    private final ArchiveFetcher fetcher;
+
     @Inject
     public DistributedDeploymentManager(@Named("s4.cluster.name") String clusterName,
             @Named("s4.cluster.zk_address") String zookeeperAddress,
             @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, ArchiveFetcher fetcher) {
 
         this.clusterName = clusterName;
         this.server = server;
+        this.fetcher = fetcher;
 
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -99,10 +103,23 @@ public class DistributedDeploymentManager implements DeploymentManager {
 
     public void deployApplication() throws DeploymentFailedException {
         ZNRecord appData = zkClient.readData(appPath);
-        String uriString = appData.getSimpleField(S4R_URI);
-        String appName = appData.getSimpleField("name");
+        AppConfig appConfig = new AppConfig(appData);
+        if (appConfig.getAppURI() == null) {
+            if (appConfig.getAppClassName() != null) {
+                try {
+                    App app = (App) getClass().getClassLoader().loadClass(appConfig.getAppClassName()).newInstance();
+                    server.startApp(app, "appName", clusterName);
+                } catch (Exception e) {
+                    logger.error("Cannot start application: cannot instantiate app class {} due to: {}",
+                            appConfig.getAppClassName(), e.getMessage());
+                    return;
+                }
+            }
+            logger.info("{} value not set for {} : no application code will be downloaded", S4R_URI, appPath);
+            return;
+        }
         try {
-            URI uri = new URI(uriString);
+            URI uri = new URI(appConfig.getAppURI());
 
             // fetch application
             File localS4RFileCopy;
@@ -111,50 +128,40 @@ public class DistributedDeploymentManager implements DeploymentManager {
             } catch (IOException e1) {
                 logger.error(
                         "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
-                        appName, e1.getClass().getName() + "->" + e1.getMessage());
-                throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
+                        appConfig.getAppName(), e1.getClass().getName() + "->" + e1.getMessage());
+                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e1);
             }
             localS4RFileCopy.deleteOnExit();
             try {
-                if (ByteStreams.copy(fetchS4App(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+                if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
                     throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
                             + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
                 }
-            } catch (IOException e) {
-                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
-                        + uri.toString() + "] ", e);
+            } catch (Exception e) {
+                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+                        + "] from URI [" + uri.toString() + "] ", e);
             }
             // install locally
-            App loaded = server.loadApp(localS4RFileCopy, appName);
+            App loaded = server.loadApp(localS4RFileCopy, appConfig.getAppName());
             if (loaded != null) {
-                logger.info("Successfully installed application {}", appName);
+                logger.info("Successfully installed application {}", appConfig.getAppName());
                 // TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
-                server.startApp(loaded, appName, clusterName);
+                server.startApp(loaded, appConfig.getAppName(), clusterName);
             } else {
-                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
-                        + uri.toString() + "] : cannot start application");
+                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+                        + "] from URI [" + uri.toString() + "] : cannot start application");
             }
 
         } catch (URISyntaxException e) {
             logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
-                    appName, uriString, e.getMessage() });
-            throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
+                    appConfig.getAppName(), appConfig.getAppURI(), e.getMessage() });
+            throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e);
         }
         deployed = true;
     }
 
     // NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
     // but that's probably not that useful, and we can simply provide whichever protocol is needed
-    public InputStream fetchS4App(URI uri) throws DeploymentFailedException {
-        String scheme = uri.getScheme();
-        if ("file".equalsIgnoreCase(scheme)) {
-            return new FileSystemS4RFetcher().fetch(uri);
-        }
-        if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
-            return new HttpS4RFetcher().fetch(uri);
-        }
-        throw new DeploymentFailedException("Unsupported protocol " + scheme);
-    }
 
     private final class AppChangeListener implements IZkDataListener {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
deleted file mode 100644
index 8947998..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.URI;
-
-/**
- * Fetches S4R files from a file system, possibly distributed.
- * 
- */
-public class FileSystemS4RFetcher implements S4RFetcher {
-
-    @Override
-    public InputStream fetch(URI uri) throws DeploymentFailedException {
-        try {
-            return new FileInputStream(new File(uri));
-        } catch (FileNotFoundException e) {
-            throw new DeploymentFailedException("Cannot retrieve file from uri [" + uri.toString() + "]");
-        }
-    }
-}


Mime
View raw message