incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dfe...@apache.org
Subject [1/2] git commit: S4-117 Simplify bootstrapping - remove deployment manager: the role of S4Bootstrap is to fetch necessary code and start the app - remove Server class: S4Bootstrap loads and start the app
Date Tue, 05 Mar 2013 14:44:15 GMT
S4-117 Simplify bootstrapping
- remove deployment manager: the role of S4Bootstrap is to fetch necessary code
and start the app
- remove Server class: S4Bootstrap loads and start the app


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/f83a82ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/f83a82ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/f83a82ae

Branch: refs/heads/dev
Commit: f83a82ae4c8ef7bf2ecbb07e9cc6fd867f55b4ad
Parents: 755ed6b
Author: Matthieu Morel <mmorel@apache.org>
Authored: Fri Mar 1 19:02:35 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Fri Mar 1 19:14:50 2013 +0100

----------------------------------------------------------------------
 README.md                                          |    2 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    4 -
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |  234 +++++++++++----
 .../src/main/java/org/apache/s4/core/Server.java   |  145 ---------
 .../org/apache/s4/deploy/DeploymentManager.java    |   30 --
 .../s4/deploy/DistributedDeploymentManager.java    |  194 ------------
 .../apache/s4/deploy/NoOpDeploymentManager.java    |   31 --
 .../org/apache/s4/core/ft/FTWordCountTest.java     |    4 +-
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |    8 +-
 .../core/moduleloader/ModuleLoaderTestUtils.java   |    3 +-
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   19 +-
 .../s4/deploy/prodcon/TestProducerConsumer.java    |    8 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   44 +++-
 .../org/apache/s4/fixtures/MockCoreModule.java     |    4 -
 .../src/main/java/org/apache/s4/tools/Status.java  |    3 +
 .../test/java/org/apache/s4/tools/TestDeploy.java  |   23 +--
 16 files changed, 229 insertions(+), 527 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 2d6b3ef..e60fd46 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ Documentation
 
 For the latest information about S4, please visit our website at:
 
-   http://inbubator.apache.org/s4
+   http://incubator.apache.org/s4
 
 and our wiki, at:
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index c4cafee..32642f0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -38,8 +38,6 @@ import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,8 +82,6 @@ public class DefaultCoreModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
-
         bind(S4RLoaderFactory.class);
 
         // For enabling checkpointing, one needs to use a custom module, such as

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 77dbc67..01e241f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -11,24 +11,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.jar.Attributes.Name;
+import java.util.jar.JarFile;
 
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.s4.base.util.ModulesLoader;
+import org.apache.s4.base.util.S4RLoader;
+import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.ModulesLoaderFactory;
 import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetchException;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.core.util.ParametersInjectionModule;
+import org.apache.s4.deploy.DeploymentFailedException;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Strings;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
@@ -46,20 +48,22 @@ import com.google.inject.util.Modules.OverriddenModuleBuilder;
  * Its roles are to:
  * <ul>
  * <li>register within the S4 cluster (and acquire a partition).
- * <li>look for application deployed on the S4 cluster
+ * <li>wait for an application to be published on the S4 cluster
  * </ul>
  * <p>
  * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
  * application code is then downloaded and the app started.
  * <p>
- * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
- * application classes are available in the classpath.
+ * For testing purposes, it is also possible to start an application without packaging code, provided the application
+ * classes are available in the classpath.
  * 
  * 
  * 
  */
 public class S4Bootstrap {
     private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
+    public static final String MANIFEST_S4_APP_CLASS = "S4-App-Class";
+    public static final String S4R_URI = "s4r_uri";
 
     private final ZkClient zkClient;
     private final String appPath;
@@ -72,16 +76,10 @@ public class S4Bootstrap {
     CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
 
     @Inject
-    public S4Bootstrap(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ArchiveFetcher fetcher) {
+    public S4Bootstrap(@Named("s4.cluster.name") String clusterName, ZkClient zkClient, ArchiveFetcher fetcher) {
 
         this.fetcher = fetcher;
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
-
+        this.zkClient = zkClient;
         String appDir = "/s4/clusters/" + clusterName + "/app";
         if (!zkClient.exists(appDir)) {
             zkClient.create(appDir, null, CreateMode.PERSISTENT);
@@ -94,14 +92,14 @@ public class S4Bootstrap {
         this.parentInjector = parentInjector;
         if (zkClient.exists(appPath)) {
             if (!deployed.get()) {
-                loadModulesAndStartNode(parentInjector);
+                loadModulesAndStartApp(parentInjector);
             }
         }
 
         signalOneAppLoaded.await();
     }
 
-    private void loadModulesAndStartNode(final Injector parentInjector) throws ArchiveFetchException {
+    private void loadModulesAndStartApp(final Injector parentInjector) throws ArchiveFetchException {
 
         final ZNRecord appData = zkClient.readData(appPath);
         // can be null
@@ -121,7 +119,7 @@ public class S4Bootstrap {
             @Override
             public void run() {
                 // load app class through modules classloader and start it
-                S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+                startS4App(appConfig, parentInjector, modulesLoader);
                 signalOneAppLoaded.countDown();
             }
         }, "S4 platform loader");
@@ -129,6 +127,96 @@ public class S4Bootstrap {
 
     }
 
+    private void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+        try {
+            App app = loadApp(appConfig, modulesLoader);
+            app.init();
+            app.start();
+
+        } catch (Exception e) {
+            logger.error("Cannot start S4 node", e);
+            System.exit(1);
+        }
+    }
+
+    private App loadApp(AppConfig appConfig, ClassLoader modulesLoader) throws DeploymentFailedException {
+
+        Module combinedPlatformModule;
+        try {
+            combinedPlatformModule = loadPlatformModules(appConfig, modulesLoader);
+        } catch (Exception e) {
+            throw new DeploymentFailedException("Cannot load platform modules", e);
+        }
+
+        if (appConfig.getAppURI() == null) {
+            if (appConfig.getAppClassName() != null) {
+                try {
+                    // In that case we won't be using an S4R classloader, app classes are available from the current
+                    // classloader
+                    // The app module provides bindings specific to the app class loader, in this case the current
+                    // thread's
+                    // class loader.
+                    AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
+                    // NOTE: because the app module can be overriden
+                    Module combinedModule = Modules.override(appModule).with(combinedPlatformModule);
+                    Injector injector = parentInjector.createChildInjector(combinedModule);
+                    logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+                    return (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+
+                    // server.startApp(app, "appName", clusterName);
+                } catch (Exception e) {
+                    throw new DeploymentFailedException(String.format(
+                            "Cannot start application: cannot instantiate app class %s due to: %s",
+                            appConfig.getAppClassName(), e.getMessage()), e);
+                }
+            } else {
+                throw new DeploymentFailedException(
+                        "Application class name must be specified when application URI omitted");
+            }
+        } else {
+            try {
+                URI uri = new URI(appConfig.getAppURI());
+
+                // fetch application
+                File localS4RFileCopy;
+                try {
+                    localS4RFileCopy = File.createTempFile("tmp", "s4r");
+                } catch (IOException e1) {
+                    logger.error(
+                            "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+                            appConfig.getAppName(), e1.getClass().getName() + "->" + e1.getMessage());
+                    throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]",
+                            e1);
+                }
+                localS4RFileCopy.deleteOnExit();
+                try {
+                    if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+                        throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
+                                + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
+                    }
+                } catch (Exception e) {
+                    throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+                            + "] from URI [" + uri.toString() + "] ", e);
+                }
+                // install locally
+                Injector injector = parentInjector.createChildInjector(combinedPlatformModule);
+
+                App loadedApp = loadS4R(injector, localS4RFileCopy, appConfig.getAppName());
+                if (loadedApp != null) {
+                    return loadedApp;
+                } else {
+                    throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+                            + "] from URI [" + uri.toString() + "] : cannot start application");
+                }
+
+            } catch (URISyntaxException e) {
+                throw new DeploymentFailedException(String.format(
+                        "Cannot deploy application [%s] : invalid URI for fetching S4R archive %s : %s", new Object[] {
+                                appConfig.getAppName(), appConfig.getAppURI(), e.getMessage() }), e);
+            }
+        }
+    }
+
     private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
 
         URI uri;
@@ -159,61 +247,79 @@ public class S4Bootstrap {
         return localModuleFileCopy;
     }
 
-    public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
-        try {
-            Injector injector;
-            InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
-            InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+    private static Module loadPlatformModules(AppConfig appConfig, ClassLoader modulesLoader) throws IOException,
+            InstantiationException, IllegalAccessException, ClassNotFoundException {
+        InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+        InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
 
-            logger.info("Initializing S4 app with : {}", appConfig.toString());
+        logger.info("Initializing S4 app with : {}", appConfig.toString());
 
-            AbstractModule commModule = new DefaultCommModule(commConfigFileInputStream);
-            AbstractModule coreModule = new DefaultCoreModule(coreConfigFileInputStream);
+        AbstractModule commModule = new DefaultCommModule(commConfigFileInputStream);
+        AbstractModule coreModule = new DefaultCoreModule(coreConfigFileInputStream);
 
-            List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
-            for (String moduleClass : appConfig.getCustomModulesNames()) {
-                extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
-            }
-            Module combinedModule = Modules.combine(commModule, coreModule);
-            if (extraModules.size() > 0) {
-                OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
-                combinedModule = overridenModuleBuilder.with(extraModules);
-            }
+        List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+        for (String moduleClass : appConfig.getCustomModulesNames()) {
+            extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+        }
+        Module combinedModule = Modules.combine(commModule, coreModule);
+        if (extraModules.size() > 0) {
+            OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+            combinedModule = overridenModuleBuilder.with(extraModules);
+        }
 
-            if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+        if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
 
-                logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
-                Map<String, String> namedParameters = new HashMap<String, String>();
+            logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+            Map<String, String> namedParameters = new HashMap<String, String>();
 
-                namedParameters.putAll(appConfig.getNamedParameters());
-                combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
-            }
+            namedParameters.putAll(appConfig.getNamedParameters());
+            combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+        }
+        return combinedModule;
+    }
 
-            if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
-                // In that case we won't be using an S4R classloader, app classes are available from the current
-                // classloader
-                // The app module provides bindings specific to the app class loader, in this case the current thread's
-                // class loader.
-                AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
-                // NOTE: because the app module can be overriden
-                combinedModule = Modules.override(appModule).with(combinedModule);
-                injector = parentInjector.createChildInjector(combinedModule);
-                logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
-                App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
-                app.init();
-                app.start();
-            } else {
-                injector = parentInjector.createChildInjector(combinedModule);
-                if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
-                    logger.info("S4 node in standby until app class or app URI is specified");
-                }
-                Server server = injector.getInstance(Server.class);
-                server.start(injector);
+    private App loadS4R(Injector injector, File s4r, String appName) {
+
+        // TODO handle application upgrade
+        logger.info("Loading application [{}] from file [{}]", appName, s4r.getAbsolutePath());
+
+        S4RLoaderFactory loaderFactory = injector.getInstance(S4RLoaderFactory.class);
+        S4RLoader appClassLoader = loaderFactory.createS4RLoader(s4r.getAbsolutePath());
+        try {
+            JarFile s4rFile = new JarFile(s4r);
+            if (s4rFile.getManifest() == null) {
+                logger.warn("Cannot load s4r archive [{}] : missing manifest file");
+                return null;
             }
-        } catch (Exception e) {
-            logger.error("Cannot start S4 node", e);
-            System.exit(1);
+            if (!s4rFile.getManifest().getMainAttributes().containsKey(new Name(MANIFEST_S4_APP_CLASS))) {
+                logger.warn("Cannot load s4r archive [{}] : missing attribute [{}] in manifest", s4r.getAbsolutePath(),
+                        MANIFEST_S4_APP_CLASS);
+                return null;
+            }
+            String appClassName = s4rFile.getManifest().getMainAttributes().getValue(MANIFEST_S4_APP_CLASS);
+            logger.info("App class name is: " + appClassName);
+            App app = null;
+
+            try {
+                Object o = (appClassLoader.loadClass(appClassName)).newInstance();
+                app = (App) o;
+                // we use the app module to provide bindings that depend upon a classloader savy of app classes, e.g.
+                // for serialization/deserialization
+                AppModule appModule = new AppModule(appClassLoader);
+                injector.createChildInjector(appModule).injectMembers(app);
+            } catch (Exception e) {
+                logger.error("Could not load s4 application form s4r file [{" + s4r.getAbsolutePath() + "}]", e);
+                return null;
+            }
+
+            logger.info("Loaded application from file {}", s4r.getAbsolutePath());
+            signalOneAppLoaded.countDown();
+            return app;
+        } catch (IOException e) {
+            logger.error("Could not load s4 application form s4r file [{" + s4r.getAbsolutePath() + "}]", e);
+            return null;
         }
+
     }
 
     class AppChangeListener implements IZkDataListener {
@@ -221,7 +327,7 @@ public class S4Bootstrap {
         @Override
         public void handleDataChange(String dataPath, Object data) throws Exception {
             if (!deployed.get()) {
-                loadModulesAndStartNode(parentInjector);
+                loadModulesAndStartApp(parentInjector);
                 deployed.set(true);
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
deleted file mode 100644
index d344698..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.jar.Attributes.Name;
-import java.util.jar.JarFile;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.s4.base.util.S4RLoader;
-import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DeploymentManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.Level;
-
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.name.Named;
-
-/**
- * The Server instance coordinates activities in a cluster node including loading and unloading of applications and
- * instantiating the communication layer.
- */
-public class Server {
-
-    private static final Logger logger = LoggerFactory.getLogger(Server.class);
-
-    private final String logLevel;
-    public static final String MANIFEST_S4_APP_CLASS = "S4-App-Class";
-    CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
-
-    private Injector injector;
-
-    @Inject
-    private DeploymentManager deploymentManager;
-
-    @Inject
-    private AssignmentFromZK assignment;
-
-    private final ZkClient zkClient;
-
-    /**
-     *
-     */
-    @Inject
-    public Server(String commModuleName, @Named("s4.logger_level") String logLevel,
-            @Named("s4.cluster.name") String clusterName, @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) {
-        this.logLevel = logLevel;
-
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        zkClient.setZkSerializer(new ZNRecordSerializer());
-    }
-
-    public void start(Injector injector) throws Exception {
-
-        this.injector = injector;
-        /* Set up logger basic configuration. */
-        ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
-                .getLogger(Logger.ROOT_LOGGER_NAME);
-        root.setLevel(Level.toLevel(logLevel));
-
-        if (deploymentManager != null) {
-            deploymentManager.start();
-        }
-
-        // wait for an app to be loaded (otherwise the server would not have anything to do and just die)
-        signalOneAppLoaded.await();
-
-    }
-
-    public App loadApp(File s4r, String appName) {
-
-        // TODO handle application upgrade
-        logger.info("Loading application [{}] from file [{}]", appName, s4r.getAbsolutePath());
-
-        S4RLoaderFactory loaderFactory = injector.getInstance(S4RLoaderFactory.class);
-        S4RLoader appClassLoader = loaderFactory.createS4RLoader(s4r.getAbsolutePath());
-        try {
-            JarFile s4rFile = new JarFile(s4r);
-            if (s4rFile.getManifest() == null) {
-                logger.warn("Cannot load s4r archive [{}] : missing manifest file");
-                return null;
-            }
-            if (!s4rFile.getManifest().getMainAttributes().containsKey(new Name(MANIFEST_S4_APP_CLASS))) {
-                logger.warn("Cannot load s4r archive [{}] : missing attribute [{}] in manifest", s4r.getAbsolutePath(),
-                        MANIFEST_S4_APP_CLASS);
-                return null;
-            }
-            String appClassName = s4rFile.getManifest().getMainAttributes().getValue(MANIFEST_S4_APP_CLASS);
-            logger.info("App class name is: " + appClassName);
-            App app = null;
-
-            try {
-                Object o = (appClassLoader.loadClass(appClassName)).newInstance();
-                app = (App) o;
-                // we use the app module to provide bindings that depend upon a classloader savy of app classes, e.g.
-                // for serialization/deserialization
-                AppModule appModule = new AppModule(appClassLoader);
-                injector.createChildInjector(appModule).injectMembers(app);
-            } catch (Exception e) {
-                logger.error("Could not load s4 application form s4r file [{" + s4r.getAbsolutePath() + "}]", e);
-                return null;
-            }
-
-            logger.info("Loaded application from file {}", s4r.getAbsolutePath());
-            signalOneAppLoaded.countDown();
-            return app;
-        } catch (IOException e) {
-            logger.error("Could not load s4 application form s4r file [{" + s4r.getAbsolutePath() + "}]", e);
-            return null;
-        }
-
-    }
-
-    public void startApp(App app, String appName, String clusterName) {
-
-        app.init();
-
-        app.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
deleted file mode 100644
index e92e97d..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-/**
- * Marker interface for deployment managers. Allows to supply a no-op deployment manager through dependency injection.
- * (TODO that hack should be improved!)
- * 
- */
-public interface DeploymentManager {
-
-    void start();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
deleted file mode 100644
index fc05c48..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.App;
-import org.apache.s4.core.Server;
-import org.apache.s4.core.util.AppConfig;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * 
- * <p>
- * Monitors application availability on a given s4 cluster. Starts the application when available.
- * </p>
- * 
- * <p>
- * More specifically, this class observes the children of <code>/&lt;s4-cluster-name&gt;/apps</code>. Children
- * correspond to S4 applications. A child's metadata contains a URI that refers to the s4r file that contains the s4
- * application code.
- * </p>
- * 
- * <p>
- * At startup, existing applications are loaded by, for each detected application:
- * <ol>
- * <li>reading the associated URI
- * <li>fetching the s4r archive from that URI, through the protocol specified in the URI, and copying the s4r to a local
- * directory. Protocol handlers are not currently pluggable and must be implemented in this class.
- * <li>loading and starting the application
- * </ol>
- * 
- * <p>
- * Then, whenever new app children are detected, the deployment manager re-executes the above steps for those new
- * applications
- * </p>
- */
-public class DistributedDeploymentManager implements DeploymentManager {
-
-    public static final String S4R_URI = "s4r_uri";
-
-    private static Logger logger = LoggerFactory.getLogger(DistributedDeploymentManager.class);
-
-    private final String clusterName;
-
-    private final ZkClient zkClient;
-    private final String appPath;
-    private final Server server;
-    boolean deployed = false;
-
-    private final ArchiveFetcher fetcher;
-
-    @Inject
-    public DistributedDeploymentManager(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, ArchiveFetcher fetcher) {
-
-        this.clusterName = clusterName;
-        this.server = server;
-        this.fetcher = fetcher;
-
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        zkClient.setZkSerializer(new ZNRecordSerializer());
-        String appDir = "/s4/clusters/" + clusterName + "/app";
-        if (!zkClient.exists(appDir)) {
-            zkClient.create(appDir, null, CreateMode.PERSISTENT);
-        }
-        appPath = appDir + "/s4App";
-        zkClient.subscribeDataChanges(appPath, new AppChangeListener());
-    }
-
-    public void deployApplication() throws DeploymentFailedException {
-        ZNRecord appData = zkClient.readData(appPath);
-        AppConfig appConfig = new AppConfig(appData);
-        if (appConfig.getAppURI() == null) {
-            if (appConfig.getAppClassName() != null) {
-                try {
-                    App app = (App) getClass().getClassLoader().loadClass(appConfig.getAppClassName()).newInstance();
-                    server.startApp(app, "appName", clusterName);
-                } catch (Exception e) {
-                    logger.error("Cannot start application: cannot instantiate app class {} due to: {}",
-                            appConfig.getAppClassName(), e.getMessage());
-                    return;
-                }
-            }
-            logger.info("{} value not set for {} : no application code will be downloaded", S4R_URI, appPath);
-            return;
-        }
-        try {
-            URI uri = new URI(appConfig.getAppURI());
-
-            // fetch application
-            File localS4RFileCopy;
-            try {
-                localS4RFileCopy = File.createTempFile("tmp", "s4r");
-            } catch (IOException e1) {
-                logger.error(
-                        "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
-                        appConfig.getAppName(), e1.getClass().getName() + "->" + e1.getMessage());
-                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e1);
-            }
-            localS4RFileCopy.deleteOnExit();
-            try {
-                if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
-                    throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
-                            + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
-                }
-            } catch (Exception e) {
-                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
-                        + "] from URI [" + uri.toString() + "] ", e);
-            }
-            // install locally
-            App loaded = server.loadApp(localS4RFileCopy, appConfig.getAppName());
-            if (loaded != null) {
-                logger.info("Successfully installed application {}", appConfig.getAppName());
-                // TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
-                server.startApp(loaded, appConfig.getAppName(), clusterName);
-            } else {
-                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
-                        + "] from URI [" + uri.toString() + "] : cannot start application");
-            }
-
-        } catch (URISyntaxException e) {
-            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
-                    appConfig.getAppName(), appConfig.getAppURI(), e.getMessage() });
-            throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e);
-        }
-        deployed = true;
-    }
-
-    // NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
-    // but that's probably not that useful, and we can simply provide whichever protocol is needed
-
-    private final class AppChangeListener implements IZkDataListener {
-        @Override
-        public void handleDataDeleted(String dataPath) throws Exception {
-            logger.error("Application undeployment is not supported yet");
-        }
-
-        @Override
-        public void handleDataChange(String dataPath, Object data) throws Exception {
-            if (!deployed) {
-                deployApplication();
-            } else {
-                logger.error("There is already an application deployed on this S4 node");
-            }
-
-        }
-
-    }
-
-    @Override
-    public void start() {
-        if (zkClient.exists(appPath)) {
-            try {
-                deployApplication();
-            } catch (DeploymentFailedException e) {
-                logger.error("Cannot deploy application", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
deleted file mode 100644
index 27777d8..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-/**
- * Does not handle any deployment (hence does not require any cluster configuration settings)
- * 
- */
-public class NoOpDeploymentManager implements DeploymentManager {
-
-    @Override
-    public void start() {
-        // does nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 50846b4..0b595b6 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -27,6 +27,7 @@ import junit.framework.Assert;
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
@@ -143,7 +144,8 @@ public class FTWordCountTest extends ZkBasedTest {
                         .customModulesNames(ImmutableList.of(FileSystemBackendCheckpointingModule.class.getName()))
                         .build(), "cluster1", false, "localhost:2181");
         // recovering and making sure checkpointing still works
-        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" }, new ZkClient("localhost:2181"), 10,
+                "cluster1");
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 4dc9484..fedb857 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -46,7 +46,7 @@ import com.google.inject.Injector;
 
 public class RecoveryTest extends ZkBasedTest {
 
-    private Process forkedS4App = null;
+    Process forkedS4App = null;
 
     @After
     public void cleanup() throws Exception {
@@ -118,7 +118,8 @@ public class RecoveryTest extends ZkBasedTest {
 
         // 1. instantiate remote S4 app
 
-        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" }, new ZkClient("localhost:2181"), 10,
+                "cluster1");
 
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
 
@@ -160,7 +161,8 @@ public class RecoveryTest extends ZkBasedTest {
                         .customModulesNames(ImmutableList.of(backendModuleClass.getName())).build(), "cluster1", true,
                 "localhost:2181");
 
-        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" }, new ZkClient("localhost:2181"), 10,
+                "cluster1");
 
         Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
         // // trigger recovery by sending application event to set value 2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index e5d0d01..ceb610d 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -62,7 +62,8 @@ public class ModuleLoaderTestUtils {
                         .customModulesNames(ImmutableList.of("org.apache.s4.TestListenerModule")).build(), "cluster1",
                 true, "localhost:2181");
         if (fork) {
-            forkedS4Node = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+            forkedS4Node = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" }, new ZkClient("localhost:2181"),
+                    10, "cluster1");
         } else {
             S4Node.main(new String[] { "-c", "cluster1" });
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 8b5329b..c71e640 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -27,12 +27,12 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.Assert;
 
 import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
@@ -171,24 +171,9 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
 
-        final CountDownLatch signalNodeReady = new CountDownLatch(1);
-
-        zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
-
-            @Override
-            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-                if (currentChilds.size() == 1) {
-                    signalNodeReady.countDown();
-                }
-
-            }
-        });
-
         checkNoAppAlreadyDeployed(zkClient);
 
-        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
-
-        Assert.assertTrue(signalNodeReady.await(10, TimeUnit.SECONDS));
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" }, zkClient, 10, "cluster1");
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 31c6a34..6d74666 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -27,12 +27,12 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.Assert;
 
 import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
@@ -188,8 +188,10 @@ public class TestProducerConsumer {
             }
         });
 
-        forkedProducerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + PRODUCER_CLUSTER });
-        forkedConsumerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CONSUMER_CLUSTER });
+        forkedProducerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + PRODUCER_CLUSTER }, zkClient, 20,
+                PRODUCER_CLUSTER);
+        forkedConsumerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CONSUMER_CLUSTER }, zkClient, 20,
+                CONSUMER_CLUSTER);
 
         Assert.assertTrue(signalProcessesReady.await(20, TimeUnit.SECONDS));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 9b24f3c..099cee2 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -22,10 +22,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
+import org.I0Itec.zkclient.IZkChildListener;
 import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.BaseModule;
 import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.S4Node;
@@ -33,8 +37,6 @@ import org.gradle.tooling.BuildLauncher;
 import org.gradle.tooling.GradleConnector;
 import org.gradle.tooling.ProjectConnection;
 
-import sun.net.ProgressListener;
-
 import com.google.common.io.PatternFilenameFilter;
 import com.google.common.io.Resources;
 import com.google.inject.Guice;
@@ -50,15 +52,41 @@ import com.google.inject.util.Modules;
 public class CoreTestUtils extends CommTestUtils {
 
     public static Process forkS4Node() throws IOException, InterruptedException {
-        return forkS4Node(new String[] {});
+        ZkClient zkClient = new ZkClient("localhost:2181");
+        int waitTimeInSeconds = 10;
+        return forkS4Node(new String[] {}, zkClient, waitTimeInSeconds, "cluster1");
     }
 
-    public static Process forkS4Node(String[] args) throws IOException, InterruptedException {
-        return forkS4Node(-1, args);
+    public static Process forkS4Node(String[] args, ZkClient zkClient, int waitTimeInSeconds, String clusterName)
+            throws IOException, InterruptedException {
+        return forkS4Node(-1, args, zkClient, waitTimeInSeconds, clusterName);
     }
 
-    public static Process forkS4Node(int debugPort, String[] args) throws IOException, InterruptedException {
-        return forkProcess(S4Node.class.getName(), debugPort, args);
+    public static Process forkS4Node(int debugPort, String[] args, ZkClient zkClient, int waitTimeInSeconds,
+            String clusterName) throws IOException, InterruptedException {
+
+        final CountDownLatch signalNodeReady = new CountDownLatch(1);
+
+        zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                if (currentChilds.size() == 1) {
+                    signalNodeReady.countDown();
+                }
+
+            }
+        });
+        Process forked = forkProcess(S4Node.class.getName(), debugPort, args);
+
+        Assert.assertTrue(signalNodeReady.await(waitTimeInSeconds, TimeUnit.SECONDS));
+
+        // having picked a partition does not mean having downloaded and loaded the platform and app code. Need to wait
+        // a bit more
+        // TODO add a zookeeper node for indicating node ready state?
+        Thread.sleep(1000);
+
+        return forked;
     }
 
     public static File findGradlewInRootDir() {
@@ -104,7 +132,7 @@ public class CoreTestUtils extends CommTestUtils {
             build.withArguments(buildArgs.toArray(new String[] {}));
 
             // if you want to listen to the progress events:
-            ProgressListener listener = null; // use your implementation
+            // ProgressListener listener = null; // use your implementation
 
             // kick the build off:
             build.run();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 7900ecc..7bacf3b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -26,8 +26,6 @@ import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
 import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.NoOpDeploymentManager;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,8 +47,6 @@ public class MockCoreModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
-
         // Although we want to mock as much as possible, most tests still require the machinery for routing events
         // within a stream/node, therefore sender and stream executors are not mocked
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 09ae37d..11fe2cd 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -205,6 +205,9 @@ public class Status extends S4ArgsBase {
     }
 
     private static String inMiddle(String content, int width) {
+        if (content == null) {
+            content = "UNDEFINED";
+        }
         int i = (width - content.length()) / 2;
         return String.format("%" + i + "s%s", " ", content);
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f83a82ae/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java b/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
index cff7359..f0563e1 100644
--- a/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
+++ b/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
@@ -20,15 +20,11 @@ package org.apache.s4.tools;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.deploy.AppConstants;
 import org.apache.s4.deploy.TestAutomaticDeployment;
 import org.apache.s4.fixtures.CommTestUtils;
@@ -53,24 +49,9 @@ public class TestDeploy extends ZkBasedTest {
         ZkClient zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
 
-        final CountDownLatch signalNodeReady = new CountDownLatch(1);
-
-        zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
-
-            @Override
-            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-                if (currentChilds.size() == 1) {
-                    signalNodeReady.countDown();
-                }
-
-            }
-        });
-
         TestAutomaticDeployment.checkNoAppAlreadyDeployed(zkClient);
 
-        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
-
-        Assert.assertTrue(signalNodeReady.await(10, TimeUnit.SECONDS));
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" }, zkClient, 10, "cluster1");
 
         // deploy app
 


Mime
View raw message