incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [1/7] Improve application and platform configuration
Date Wed, 23 Jan 2013 22:35:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
deleted file mode 100644
index 1bb4c01..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpClientCodec;
-import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.io.ByteStreams;
-
-/**
- * <p>
- * Fetches S4R archive through HTTP.
- * </p>
- * <p>
- * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
- * 
- * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
- *      snoop example</a>
- * 
- *      </p>
- */
-public class HttpS4RFetcher implements S4RFetcher {
-
-    private static Logger logger = LoggerFactory.getLogger(HttpS4RFetcher.class);
-
-    @Override
-    public InputStream fetch(URI uri) throws DeploymentFailedException {
-        logger.debug("Fetching file through http: {}", uri.toString());
-
-        String host = uri.getHost();
-        int port = uri.getPort();
-        if (port == -1) {
-            if (uri.getScheme().equalsIgnoreCase("http")) {
-                port = 80;
-            } else if (uri.getScheme().equalsIgnoreCase("https")) {
-                port = 443;
-            }
-        }
-
-        ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-        File tmpFile;
-        try {
-            tmpFile = File.createTempFile("http", "download");
-        } catch (IOException e) {
-            throw new DeploymentFailedException("Cannot create temporary file for fetching s4r data from http server",
-                    e);
-        }
-        clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
-        ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
-        // TODO timeout?
-        Channel channel = channelFuture.awaitUninterruptibly().getChannel();
-        if (!channelFuture.isSuccess()) {
-            clientBootstrap.releaseExternalResources();
-            throw new DeploymentFailedException("Cannot connect to http uri [" + uri.toString() + "]",
-                    channelFuture.getCause());
-        }
-
-        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
-        request.setHeader(HttpHeaders.Names.HOST, host);
-        request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-        request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
-
-        channel.write(request);
-
-        channel.getCloseFuture().awaitUninterruptibly();
-
-        clientBootstrap.releaseExternalResources();
-
-        logger.debug("Finished downloading s4r file through http {}", uri.toString());
-        try {
-            return new FileInputStream(tmpFile);
-        } catch (FileNotFoundException e) {
-            throw new DeploymentFailedException("Cannot get input stream from temporary file with s4r data ["
-                    + tmpFile.getAbsolutePath() + "]");
-        }
-    }
-
-    private class HttpClientPipelineFactory implements ChannelPipelineFactory {
-
-        File tmpFile;
-
-        public HttpClientPipelineFactory(File tmpFile) {
-            this.tmpFile = tmpFile;
-        }
-
-        @Override
-        public ChannelPipeline getPipeline() throws Exception {
-            // Create a default pipeline implementation.
-            ChannelPipeline pipeline = Channels.pipeline();
-
-            pipeline.addLast("codec", new HttpClientCodec());
-
-            // Remove the following line if you don't want automatic content decompression.
-            pipeline.addLast("inflater", new HttpContentDecompressor());
-
-            pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
-            return pipeline;
-        }
-    }
-
-    // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
-    private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
-
-        private boolean readingChunks;
-        FileOutputStream fos;
-
-        public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
-            this.fos = new FileOutputStream(tmpFile);
-        }
-
-        @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-            if (!readingChunks) {
-                HttpResponse response = (HttpResponse) e.getMessage();
-
-                if (response.isChunked()) {
-                    readingChunks = true;
-                } else {
-                    copyContentToTmpFile(response.getContent());
-                }
-            } else {
-                HttpChunk chunk = (HttpChunk) e.getMessage();
-                if (chunk.isLast()) {
-                    readingChunks = false;
-                    fos.close();
-                } else {
-                    copyContentToTmpFile(chunk.getContent());
-                }
-            }
-
-        }
-
-        private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
-            ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
-            ByteStreams.copy(cbis, fos);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
deleted file mode 100644
index 9c6590f..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.InputStream;
-import java.net.URI;
-
-/**
- * This interface defines methods to fetch S4R archive files from a URI. Various protocols can be supported in the
- * implementation classes (e.g. file system, HTTP etc...)
- * 
- */
-public interface S4RFetcher {
-
-    /**
-     * Returns a stream to an S4R archive file
-     * 
-     * @param uri
-     *            S4R archive identifier
-     * @return an input stream for accessing the content of the S4R file
-     * @throws DeploymentFailedException
-     *             when fetching fails
-     */
-    InputStream fetch(URI uri) throws DeploymentFailedException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index 6001595..d1a557c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -48,6 +48,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.name.Names;
 
 public class CheckpointingTest {
 
@@ -159,8 +160,11 @@ public class CheckpointingTest {
 
     private static class MockCoreModuleWithFileBaseCheckpointingBackend extends MockCoreModule {
 
+        @Override
         protected void configure() {
             super.configure();
+            bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+                    DEFAULT_STORAGE_DIR.getAbsolutePath());
             bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
             bind(CheckpointingFramework.class).to(SafeKeeper.class);
             bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index c3d30b7..8d3d324 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -28,6 +28,8 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
@@ -38,6 +40,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.Injector;
 
 public class FTWordCountTest extends ZkBasedTest {
@@ -74,6 +78,7 @@ public class FTWordCountTest extends ZkBasedTest {
 
         injectSentence(injector, emitter, WordCountTest.SENTENCE_1);
         signalSentence1Processed.await(10, TimeUnit.SECONDS);
+        // TODO replace with zk-notified-latch
         Thread.sleep(1000);
 
         // crash the app
@@ -96,6 +101,7 @@ public class FTWordCountTest extends ZkBasedTest {
         injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
 
         sentence2Processed.await(10, TimeUnit.SECONDS);
+        // TODO replace with zk-notified-latch
         Thread.sleep(1000);
 
         // crash the app
@@ -108,7 +114,7 @@ public class FTWordCountTest extends ZkBasedTest {
             zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
         injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
-        Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalTextProcessed.await(40, TimeUnit.SECONDS));
         String results = new String(zk.getData("/results", false, null));
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
 
@@ -125,12 +131,17 @@ public class FTWordCountTest extends ZkBasedTest {
     private void restartNode() throws IOException, InterruptedException {
         CountDownLatch signalConsumerReady = RecoveryTest.getConsumerReadySignal("inputStream");
 
+        DeploymentUtils.initAppConfig(
+                new AppConfig.Builder()
+                        .appClassName(FTWordCountApp.class.getName())
+                        .namedParameters(
+                                ImmutableMap.of("s4.checkpointing.filesystem.storageRootPath",
+                                        CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath(),
+                                        "s4.checkpointing.storageMaxThreads", "3"))
+                        .customModulesNames(ImmutableList.of(FileSystemBackendCheckpointingModule.class.getName()))
+                        .build(), "cluster1", false, "localhost:2181");
         // recovering and making sure checkpointing still works
-        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
-                FTWordCountApp.class.getName(), "-p",
-                "s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR,
-                "-extraModulesClasses", FileSystemBackendCheckpointingModule.class.getName() });
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
index 6b8dc4a..54e9ed9 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
@@ -19,17 +19,14 @@
 package org.apache.s4.core.ft;
 
 import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
-import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.google.inject.name.Names;
-
 /**
  * Creates the /checkpointed znode if a successful checkpointing callback is received. Does it only once.
- *
+ * 
  */
 public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule extends
         FileSystemBackendCheckpointingModule {
@@ -38,8 +35,7 @@ public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule exte
     protected void configure() {
         super.configure();
         bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
-        bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
-                CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+
     }
 
     public static class DummyZKStorageCallbackFactory implements StorageCallbackFactory {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index f2b0297..2f4f742 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -31,6 +31,9 @@ import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.KeeperException;
@@ -38,6 +41,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.Injector;
 
 public class RecoveryTest extends ZkBasedTest {
@@ -99,9 +104,19 @@ public class RecoveryTest extends ZkBasedTest {
         // use a latch for waiting for app to be ready
         CountDownLatch signalConsumerReady = getConsumerReadySignal("inputStream");
 
+        DeploymentUtils.initAppConfig(
+                new AppConfig.Builder()
+                        .appClassName(appClass.getName())
+                        .namedParameters(
+                                ImmutableMap.of("s4.checkpointing.filesystem.storageRootPath",
+                                        CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath()))
+                        .customModulesNames(ImmutableList.of(backendModuleClass.getName())).build(), "cluster1", true,
+                "localhost:2181");
+        // recovering and making sure checkpointing still works
+
         // 1. instantiate remote S4 app
-        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass", appClass.getName(),
-                "-extraModulesClasses", backendModuleClass.getName() });
+
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
 
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
 
@@ -131,8 +146,16 @@ public class RecoveryTest extends ZkBasedTest {
         zk.delete("/data", -1);
 
         signalConsumerReady = getConsumerReadySignal("inputStream");
-        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
-                S4AppWithManualCheckpointing.class.getName(), "-extraModulesClasses", backendModuleClass.getName() });
+        DeploymentUtils.initAppConfig(
+                new AppConfig.Builder()
+                        .appClassName(S4AppWithManualCheckpointing.class.getName())
+                        .namedParameters(
+                                ImmutableMap.of("s4.checkpointing.filesystem.storageRootPath",
+                                        CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath()))
+                        .customModulesNames(ImmutableList.of(backendModuleClass.getName())).build(), "cluster1", true,
+                "localhost:2181");
+
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
 
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
         // // trigger recovery by sending application event to set value 2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
new file mode 100644
index 0000000..5113a62
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
@@ -0,0 +1,131 @@
+package org.apache.s4.core.moduleloader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.BaseModule;
+import org.apache.s4.core.S4Node;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.WordCountApp;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestModuleLoader extends ZkBasedTest {
+
+    private static final int NB_MESSAGES = 10;
+    private Process forkS4Node;
+    private TCPEmitter emitter;
+    private Injector injector;
+
+    public TestModuleLoader() {
+        super(2);
+    }
+
+    @Test
+    public void testLocal() throws Exception {
+        testModuleLoader(false);
+    }
+
+    protected void testModuleLoader(boolean fork) throws Exception {
+
+        // build custom-modules.jar
+        File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/custom-modules/build.gradle"), "clean", new String[0]);
+
+        File modulesJarFile = new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/custom-modules/build/libs/app/custom-modules.jar");
+
+        Assert.assertFalse(modulesJarFile.exists());
+
+        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/custom-modules/build.gradle"), "jar", new String[0]);
+
+        // make sure it is created
+        Assert.assertTrue(modulesJarFile.exists());
+
+        // pass it as a configuration
+        DeploymentUtils.initAppConfig(
+                new AppConfig.Builder().appClassName(WordCountApp.class.getName())
+                        .customModulesURIs(ImmutableList.of(modulesJarFile.toURI().toString()))
+                        .customModulesNames(ImmutableList.of("org.apache.s4.TestListenerModule")).build(), "cluster1",
+                true, "localhost:2181");
+        if (fork) {
+            forkS4Node = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+        } else {
+            S4Node.main(new String[] { "-c", "cluster1" });
+        }
+
+        // injector = Guice.createInjector(new BaseModule(
+        // Resources.getResource("default.s4.base.properties").openStream(), "cluster1"),
+        // new CommModuleWithoutListener(Resources.getResource("default.s4.comm.properties").openStream()));
+
+        injector = Guice.createInjector(new BaseModule(
+                Resources.getResource("default.s4.base.properties").openStream(), "cluster1"), new DefaultCommModule(
+                Resources.getResource("default.s4.comm.properties").openStream()));
+
+        Emitter emitter = injector.getInstance(TCPEmitter.class);
+        List<Long> messages = Lists.newArrayList();
+        for (int i = 0; i < NB_MESSAGES; i++) {
+            messages.add(System.currentTimeMillis());
+        }
+
+        ZkClient zkClient = new ZkClient("localhost:2181");
+        zkClient.create("/test", 0, CreateMode.PERSISTENT);
+
+        final ZooKeeper zk = CommTestUtils.createZkClient();
+        final CountDownLatch signalMessagesReceived = new CountDownLatch(1);
+
+        // watch for last message in test data sequence
+        CoreTestUtils.watchAndSignalCreation("/test/data" + Strings.padStart(String.valueOf(NB_MESSAGES - 1), 10, '0'),
+                signalMessagesReceived, zk);
+
+        for (Long message : messages) {
+            Event event = new Event();
+            event.put("message", long.class, message);
+            emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+                    .serialize(event)));
+        }
+
+        // check sequential nodes in zk with correct data
+        Assert.assertTrue(signalMessagesReceived.await(10, TimeUnit.SECONDS));
+        List<String> children = zkClient.getChildren("/test");
+        for (String child : children) {
+            Long data = zkClient.readData("/test/" + child);
+            Assert.assertTrue(messages.contains(data));
+        }
+
+    }
+
+    @After
+    public void cleanUp() throws IOException, InterruptedException {
+        CoreTestUtils.killS4App(forkS4Node);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java
new file mode 100644
index 0000000..0ebbe8b
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java
@@ -0,0 +1,12 @@
+package org.apache.s4.core.moduleloader;
+
+import org.junit.Test;
+
+// separated from its parent in order to start with clean environment with forked tests
+public class TestModuleLoaderRemote extends TestModuleLoader {
+
+    @Test
+    public void testRemote() throws Exception {
+        testModuleLoader(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1fbd37a..d2e2c6a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -37,12 +37,11 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
@@ -113,9 +112,11 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
                 CommTestUtils.createZkClient());
 
-        ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-        record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-        zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uri).build(), "cluster1", true, "localhost:2181");
+
+        // ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+        // record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+        // zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
 
         Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
         Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@ -153,7 +154,7 @@ public class TestAutomaticDeployment extends ZkBasedTest {
                 Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
                         + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
 
-        // we start a
+        // we start a simple web server
         InetSocketAddress addr = new InetSocketAddress(8080);
         httpServer = HttpServer.create(addr, 0);
 
@@ -166,6 +167,7 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         // check resource loading (we use a zkclient without custom serializer)
         ZkClient client2 = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         Assert.assertEquals("Salut!", client2.readData("/resourceData"));
+        client2.close();
 
     }
 
@@ -192,6 +194,8 @@ public class TestAutomaticDeployment extends ZkBasedTest {
             }
         });
 
+        // CoreTestUtils.initAppConfig(new AppConfig.Builder().build(), true);
+
         forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
 
         // TODO synchro with ready state from zk

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 442bbcd..b48d371 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -32,12 +32,11 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@ -139,15 +138,13 @@ public class TestProducerConsumer {
             }
         });
 
-        ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-        record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriConsumer);
-        zkClient.create("/s4/clusters/" + CONSUMER_CLUSTER + "/app/s4App", record2, CreateMode.PERSISTENT);
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uriConsumer).build(), CONSUMER_CLUSTER, true,
+                "localhost:2181");
         // TODO check that consumer app is ready with a better way than checking stream consumers
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
 
-        ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
-        zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uriProducer).build(), PRODUCER_CLUSTER, true,
+                "localhost:2181");
 
         // that may be a bit long to complete...
         Assert.assertTrue(signalConsumptionComplete.await(30, TimeUnit.SECONDS));
@@ -194,8 +191,6 @@ public class TestProducerConsumer {
         forkedProducerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + PRODUCER_CLUSTER });
         forkedConsumerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CONSUMER_CLUSTER });
 
-        // TODO synchro with ready state from zk
-        // Thread.sleep(10000);
         Assert.assertTrue(signalProcessesReady.await(20, TimeUnit.SECONDS));
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 7e88dd8..9b24f3c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -26,8 +26,9 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.BaseModule;
 import org.apache.s4.core.DefaultCoreModule;
-import org.apache.s4.core.Main;
+import org.apache.s4.core.S4Node;
 import org.gradle.tooling.BuildLauncher;
 import org.gradle.tooling.GradleConnector;
 import org.gradle.tooling.ProjectConnection;
@@ -57,7 +58,7 @@ public class CoreTestUtils extends CommTestUtils {
     }
 
     public static Process forkS4Node(int debugPort, String[] args) throws IOException, InterruptedException {
-        return forkProcess(Main.class.getName(), debugPort, args);
+        return forkProcess(S4Node.class.getName(), debugPort, args);
     }
 
     public static File findGradlewInRootDir() {
@@ -115,8 +116,10 @@ public class CoreTestUtils extends CommTestUtils {
 
     public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
         return Guice.createInjector(Modules.override(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1"),
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
                 new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
                 new NonFailFastZookeeperClientsModule()));
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 1c3ba84..7147894 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -63,7 +63,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
                 }
             }
 
-            System.out.println("seen: " + wcEvent.getWord() + "/" + wcEvent.getCount());
+            logger.info("seen: " + wcEvent.getWord() + "/" + wcEvent.getCount());
 
             if (!counts.containsKey(wcEvent.getWord())
                     || (counts.containsKey(wcEvent.getWord()) && counts.get(wcEvent.getWord()).compareTo(

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index db45fcb..f322256 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -29,14 +29,16 @@ import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.BaseModule;
 import org.apache.s4.core.DefaultCoreModule;
-import org.apache.s4.core.Main;
+import org.apache.s4.core.S4Node;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
-import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.io.Resources;
@@ -54,22 +56,13 @@ public class WordCountTest extends ZkBasedTest {
     public static final String FLAG = ";";
     public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
     private TCPEmitter emitter;
-    private Injector injector;
+    Injector injector;
 
-    // private static Factory zookeeperServerConnectionFactory;
-
-    // @Before
-    // public void prepare() throws IOException, InterruptedException, KeeperException {
-    // CommTestUtils.cleanupTmpDirs();
-    // zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
-    //
-    // }
-
-    @Before
-    public void prepareEmitter() throws IOException {
-        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                .openStream(), "cluster1"), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
-                .openStream()));
+    public void createEmitter() throws IOException {
+        injector = Guice.createInjector(new BaseModule(
+                Resources.getResource("default.s4.base.properties").openStream(), "cluster1"), new DefaultCommModule(
+                Resources.getResource("default.s4.comm.properties").openStream()), new DefaultCoreModule(Resources
+                .getResource("default.s4.core.properties").openStream()));
 
         emitter = injector.getInstance(TCPEmitter.class);
 
@@ -92,9 +85,12 @@ public class WordCountTest extends ZkBasedTest {
     @Test
     public void testSimple() throws Exception {
         final ZooKeeper zk = CommTestUtils.createZkClient();
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(WordCountApp.class.getName()).build(),
+                "cluster1", true, "localhost:2181");
+        S4Node.main(new String[] { "-cluster=cluster1", });
 
-        Main.main(new String[] { "-cluster=cluster1", "-appClass=" + WordCountApp.class.getName(),
-                "-extraModulesClasses=" + WordCountModule.class.getName() });
+        // we create the emitter now, it will share zk node assignment with the S4 node
+        createEmitter();
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
@@ -114,6 +110,8 @@ public class WordCountTest extends ZkBasedTest {
     public void injectSentence(String sentence) throws IOException {
         Event event = new Event();
         event.put("sentence", String.class, sentence);
+
+        // NOTE: we send to partition 1 since partition 0 hosts the emitter
         emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
                 .serialize(event)));
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
index 1b9f310..3041a94 100644
--- a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
@@ -21,6 +21,7 @@ package org.apache.s4.edsl;
 import java.lang.reflect.Field;
 
 import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.BaseModule;
 import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.junit.Test;
@@ -35,9 +36,11 @@ public class TestEDSL extends ZkBasedTest {
 
     @Test
     public void test() throws Exception {
-        Injector injector = Guice.createInjector(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+        Injector injector = Guice
+                .createInjector(new BaseModule(Resources.getResource("default.s4.base.properties").openStream(),
+                        "cluster1"), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                        .openStream()), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
+                        .openStream()));
         MyApp myApp = injector.getInstance(MyApp.class);
 
         /* Normally. the container will handle this but this is just a test. */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 94e359c..e39d00f 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -22,21 +22,25 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.I0Itec.zkclient.ZkClient;
-import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-import org.apache.zookeeper.CreateMode;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
 import org.gradle.tooling.BuildLauncher;
 import org.gradle.tooling.GradleConnector;
 import org.gradle.tooling.ProgressListener;
 import org.gradle.tooling.ProjectConnection;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.FileConverter;
+import com.beust.jcommander.internal.Maps;
 import com.google.common.base.Strings;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
@@ -101,25 +105,12 @@ public class Deploy extends S4ArgsBase {
                 }
             }
 
-            final String uri = s4rToDeploy.toURI().toString();
-            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            record.putSimpleField("name", deployArgs.appName);
-            String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
-            if (zkClient.exists(deployedAppPath)) {
-                ZNRecord readData = zkClient.readData(deployedAppPath);
-                logger.error("Cannot deploy app [{}], because app [{}] is already deployed", deployArgs.appName,
-                        readData.getSimpleField("name"));
-                System.exit(1);
-            }
-
-            zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/app/s4App", record, CreateMode.PERSISTENT);
-            logger.info(
-                    "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
-                    new String[] { deployArgs.appName, deployArgs.clusterName,
-                            "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName,
-                            s4rToDeploy.getAbsolutePath() });
-
+            DeploymentUtils.initAppConfig(
+                    new AppConfig.Builder().appName(deployArgs.appName).appURI(s4rToDeploy.toURI().toString())
+                            .customModulesNames(deployArgs.modulesClassesNames)
+                            .customModulesURIs(deployArgs.modulesURIs)
+                            .namedParameters(convertListArgsToMap(deployArgs.extraNamedParameters)).build(),
+                    deployArgs.clusterName, false, deployArgs.zkConnectionString);
             // Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
             System.exit(0);
         } catch (Exception e) {
@@ -128,6 +119,18 @@ public class Deploy extends S4ArgsBase {
 
     }
 
+    private static Map<String, String> convertListArgsToMap(List<String> args) {
+        Map<String, String> result = Maps.newHashMap();
+        for (String arg : args) {
+            String[] split = arg.split("[=]");
+            if (!(split.length == 2)) {
+                throw new RuntimeException("Invalid args: " + Arrays.toString(args.toArray(new String[] {})));
+            }
+            result.put(split[0], split[1]);
+        }
+        return result;
+    }
+
     @Parameters(commandNames = "s4 deploy", commandDescription = "Package and deploy application to S4 cluster", separators = "=")
     static class DeployAppArgs extends S4ArgsBase {
 
@@ -155,6 +158,33 @@ public class Deploy extends S4ArgsBase {
         @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
         int timeout = 10000;
 
+        @Parameter(names = { "-modulesURIs", "-mu" }, description = "URIs for fetching code of custom modules")
+        List<String> modulesURIs = new ArrayList<String>();
+
+        @Parameter(names = { "-modulesClasses", "-emc", "-mc" }, description = "Fully qualified class names of custom modules")
+        List<String> modulesClassesNames = new ArrayList<String>();
+
+        @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+        List<String> extraNamedParameters = new ArrayList<String>();
+
+    }
+
+    /**
+     * Parameters parsing utility.
+     * 
+     */
+    public static class InlineConfigParameterConverter implements IStringConverter<String> {
+
+        @Override
+        public String convert(String arg) {
+            Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
+            logger.info("processing inline configuration parameter {}", arg);
+            Matcher parameterMatcher = parameterPattern.matcher(arg);
+            if (!parameterMatcher.find()) {
+                throw new IllegalArgumentException("Cannot understand parameter " + arg);
+            }
+            return parameterMatcher.group(1);
+        }
     }
 
     static class ExecGradle {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index dbd63b9..832cfcc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Level;
-import org.apache.s4.core.Main;
+import org.apache.s4.core.S4Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,8 +39,8 @@ public class Tools {
     static Logger logger = LoggerFactory.getLogger(Tools.class);
 
     enum Task {
-        deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
-                CreateApp.class), s4r(Package.class), status(Status.class);
+        deploy(Deploy.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(
+                null), newApp(CreateApp.class), s4r(Package.class), status(Status.class);
 
         Class<?> target;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index ed71fb0..4893928 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -108,7 +108,7 @@ public class ZKServer {
                 + "zookeeper" + File.separator + "data").getAbsolutePath();
 
         @Parameter(names = "-clean", description = "clean zookeeper data (arity=0) (make sure you specify correct directories...)")
-        boolean clean = true;
+        boolean clean = false;
 
         @Parameter(names = "-logDir", description = "log directory")
         String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator


Mime
View raw message