http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
deleted file mode 100644
index 1bb4c01..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpClientCodec;
-import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.io.ByteStreams;
-
-/**
- * <p>
- * Fetches S4R archive through HTTP.
- * </p>
- * <p>
- * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
- *
- * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
- * snoop example</a>
- *
- * </p>
- */
-public class HttpS4RFetcher implements S4RFetcher {
-
- private static Logger logger = LoggerFactory.getLogger(HttpS4RFetcher.class);
-
- @Override
- public InputStream fetch(URI uri) throws DeploymentFailedException {
- logger.debug("Fetching file through http: {}", uri.toString());
-
- String host = uri.getHost();
- int port = uri.getPort();
- if (port == -1) {
- if (uri.getScheme().equalsIgnoreCase("http")) {
- port = 80;
- } else if (uri.getScheme().equalsIgnoreCase("https")) {
- port = 443;
- }
- }
-
- ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
- File tmpFile;
- try {
- tmpFile = File.createTempFile("http", "download");
- } catch (IOException e) {
- throw new DeploymentFailedException("Cannot create temporary file for fetching s4r data from http server",
- e);
- }
- clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
- ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
- // TODO timeout?
- Channel channel = channelFuture.awaitUninterruptibly().getChannel();
- if (!channelFuture.isSuccess()) {
- clientBootstrap.releaseExternalResources();
- throw new DeploymentFailedException("Cannot connect to http uri [" + uri.toString() + "]",
- channelFuture.getCause());
- }
-
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
- request.setHeader(HttpHeaders.Names.HOST, host);
- request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
- request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
-
- channel.write(request);
-
- channel.getCloseFuture().awaitUninterruptibly();
-
- clientBootstrap.releaseExternalResources();
-
- logger.debug("Finished downloading s4r file through http {}", uri.toString());
- try {
- return new FileInputStream(tmpFile);
- } catch (FileNotFoundException e) {
- throw new DeploymentFailedException("Cannot get input stream from temporary file with s4r data ["
- + tmpFile.getAbsolutePath() + "]");
- }
- }
-
- private class HttpClientPipelineFactory implements ChannelPipelineFactory {
-
- File tmpFile;
-
- public HttpClientPipelineFactory(File tmpFile) {
- this.tmpFile = tmpFile;
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
-
- pipeline.addLast("codec", new HttpClientCodec());
-
- // Remove the following line if you don't want automatic content decompression.
- pipeline.addLast("inflater", new HttpContentDecompressor());
-
- pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
- return pipeline;
- }
- }
-
- // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
- private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
-
- private boolean readingChunks;
- FileOutputStream fos;
-
- public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
- this.fos = new FileOutputStream(tmpFile);
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- if (!readingChunks) {
- HttpResponse response = (HttpResponse) e.getMessage();
-
- if (response.isChunked()) {
- readingChunks = true;
- } else {
- copyContentToTmpFile(response.getContent());
- }
- } else {
- HttpChunk chunk = (HttpChunk) e.getMessage();
- if (chunk.isLast()) {
- readingChunks = false;
- fos.close();
- } else {
- copyContentToTmpFile(chunk.getContent());
- }
- }
-
- }
-
- private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
- ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
- ByteStreams.copy(cbis, fos);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
deleted file mode 100644
index 9c6590f..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.InputStream;
-import java.net.URI;
-
-/**
- * This interface defines methods to fetch S4R archive files from a URI. Various protocols can be supported in the
- * implementation classes (e.g. file system, HTTP etc...)
- *
- */
-public interface S4RFetcher {
-
- /**
- * Returns a stream to an S4R archive file
- *
- * @param uri
- * S4R archive identifier
- * @return an input stream for accessing the content of the S4R file
- * @throws DeploymentFailedException
- * when fetching fails
- */
- InputStream fetch(URI uri) throws DeploymentFailedException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index 6001595..d1a557c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -48,6 +48,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.name.Names;
public class CheckpointingTest {
@@ -159,8 +160,11 @@ public class CheckpointingTest {
private static class MockCoreModuleWithFileBaseCheckpointingBackend extends MockCoreModule {
+ @Override
protected void configure() {
super.configure();
+ bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+ DEFAULT_STORAGE_DIR.getAbsolutePath());
bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index c3d30b7..8d3d324 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -28,6 +28,8 @@ import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
@@ -38,6 +40,8 @@ import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
public class FTWordCountTest extends ZkBasedTest {
@@ -74,6 +78,7 @@ public class FTWordCountTest extends ZkBasedTest {
injectSentence(injector, emitter, WordCountTest.SENTENCE_1);
signalSentence1Processed.await(10, TimeUnit.SECONDS);
+ // TODO replace with zk-notified-latch
Thread.sleep(1000);
// crash the app
@@ -96,6 +101,7 @@ public class FTWordCountTest extends ZkBasedTest {
injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
sentence2Processed.await(10, TimeUnit.SECONDS);
+ // TODO replace with zk-notified-latch
Thread.sleep(1000);
// crash the app
@@ -108,7 +114,7 @@ public class FTWordCountTest extends ZkBasedTest {
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
- Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalTextProcessed.await(40, TimeUnit.SECONDS));
String results = new String(zk.getData("/results", false, null));
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
@@ -125,12 +131,17 @@ public class FTWordCountTest extends ZkBasedTest {
private void restartNode() throws IOException, InterruptedException {
CountDownLatch signalConsumerReady = RecoveryTest.getConsumerReadySignal("inputStream");
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder()
+ .appClassName(FTWordCountApp.class.getName())
+ .namedParameters(
+ ImmutableMap.of("s4.checkpointing.filesystem.storageRootPath",
+ CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath(),
+ "s4.checkpointing.storageMaxThreads", "3"))
+ .customModulesNames(ImmutableList.of(FileSystemBackendCheckpointingModule.class.getName()))
+ .build(), "cluster1", false, "localhost:2181");
// recovering and making sure checkpointing still works
- forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
- FTWordCountApp.class.getName(), "-p",
- "s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR,
- "-extraModulesClasses", FileSystemBackendCheckpointingModule.class.getName() });
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
index 6b8dc4a..54e9ed9 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.java
@@ -19,17 +19,14 @@
package org.apache.s4.core.ft;
import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
-import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
-import com.google.inject.name.Names;
-
/**
* Creates the /checkpointed znode if a successful checkpointing callback is received. Does it only once.
- *
+ *
*/
public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule extends
FileSystemBackendCheckpointingModule {
@@ -38,8 +35,7 @@ public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule exte
protected void configure() {
super.configure();
bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
- bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
- CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+
}
public static class DummyZKStorageCallbackFactory implements StorageCallbackFactory {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index f2b0297..2f4f742 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -31,6 +31,9 @@ import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
@@ -38,6 +41,8 @@ import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
public class RecoveryTest extends ZkBasedTest {
@@ -99,9 +104,19 @@ public class RecoveryTest extends ZkBasedTest {
// use a latch for waiting for app to be ready
CountDownLatch signalConsumerReady = getConsumerReadySignal("inputStream");
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder()
+ .appClassName(appClass.getName())
+ .namedParameters(
+ ImmutableMap.of("s4.checkpointing.filesystem.storageRootPath",
+ CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath()))
+ .customModulesNames(ImmutableList.of(backendModuleClass.getName())).build(), "cluster1", true,
+ "localhost:2181");
+ // recovering and making sure checkpointing still works
+
// 1. instantiate remote S4 app
- forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass", appClass.getName(),
- "-extraModulesClasses", backendModuleClass.getName() });
+
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
@@ -131,8 +146,16 @@ public class RecoveryTest extends ZkBasedTest {
zk.delete("/data", -1);
signalConsumerReady = getConsumerReadySignal("inputStream");
- forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
- S4AppWithManualCheckpointing.class.getName(), "-extraModulesClasses", backendModuleClass.getName() });
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder()
+ .appClassName(S4AppWithManualCheckpointing.class.getName())
+ .namedParameters(
+ ImmutableMap.of("s4.checkpointing.filesystem.storageRootPath",
+ CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath()))
+ .customModulesNames(ImmutableList.of(backendModuleClass.getName())).build(), "cluster1", true,
+ "localhost:2181");
+
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
// // trigger recovery by sending application event to set value 2
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
new file mode 100644
index 0000000..5113a62
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
@@ -0,0 +1,131 @@
+package org.apache.s4.core.moduleloader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.BaseModule;
+import org.apache.s4.core.S4Node;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.WordCountApp;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestModuleLoader extends ZkBasedTest {
+
+ private static final int NB_MESSAGES = 10;
+ private Process forkS4Node;
+ private TCPEmitter emitter;
+ private Injector injector;
+
+ public TestModuleLoader() {
+ super(2);
+ }
+
+ @Test
+ public void testLocal() throws Exception {
+ testModuleLoader(false);
+ }
+
+ protected void testModuleLoader(boolean fork) throws Exception {
+
+ // build custom-modules.jar
+ File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/custom-modules/build.gradle"), "clean", new String[0]);
+
+ File modulesJarFile = new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/custom-modules/build/libs/app/custom-modules.jar");
+
+ Assert.assertFalse(modulesJarFile.exists());
+
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/custom-modules/build.gradle"), "jar", new String[0]);
+
+ // make sure it is created
+ Assert.assertTrue(modulesJarFile.exists());
+
+ // pass it as a configuration
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder().appClassName(WordCountApp.class.getName())
+ .customModulesURIs(ImmutableList.of(modulesJarFile.toURI().toString()))
+ .customModulesNames(ImmutableList.of("org.apache.s4.TestListenerModule")).build(), "cluster1",
+ true, "localhost:2181");
+ if (fork) {
+ forkS4Node = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+ } else {
+ S4Node.main(new String[] { "-c", "cluster1" });
+ }
+
+ // injector = Guice.createInjector(new BaseModule(
+ // Resources.getResource("default.s4.base.properties").openStream(), "cluster1"),
+ // new CommModuleWithoutListener(Resources.getResource("default.s4.comm.properties").openStream()));
+
+ injector = Guice.createInjector(new BaseModule(
+ Resources.getResource("default.s4.base.properties").openStream(), "cluster1"), new DefaultCommModule(
+ Resources.getResource("default.s4.comm.properties").openStream()));
+
+ Emitter emitter = injector.getInstance(TCPEmitter.class);
+ List<Long> messages = Lists.newArrayList();
+ for (int i = 0; i < NB_MESSAGES; i++) {
+ messages.add(System.currentTimeMillis());
+ }
+
+ ZkClient zkClient = new ZkClient("localhost:2181");
+ zkClient.create("/test", 0, CreateMode.PERSISTENT);
+
+ final ZooKeeper zk = CommTestUtils.createZkClient();
+ final CountDownLatch signalMessagesReceived = new CountDownLatch(1);
+
+ // watch for last message in test data sequence
+ CoreTestUtils.watchAndSignalCreation("/test/data" + Strings.padStart(String.valueOf(NB_MESSAGES - 1), 10, '0'),
+ signalMessagesReceived, zk);
+
+ for (Long message : messages) {
+ Event event = new Event();
+ event.put("message", long.class, message);
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
+ }
+
+ // check sequential nodes in zk with correct data
+ Assert.assertTrue(signalMessagesReceived.await(10, TimeUnit.SECONDS));
+ List<String> children = zkClient.getChildren("/test");
+ for (String child : children) {
+ Long data = zkClient.readData("/test/" + child);
+ Assert.assertTrue(messages.contains(data));
+ }
+
+ }
+
+ @After
+ public void cleanUp() throws IOException, InterruptedException {
+ CoreTestUtils.killS4App(forkS4Node);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java
new file mode 100644
index 0000000..0ebbe8b
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoaderRemote.java
@@ -0,0 +1,12 @@
+package org.apache.s4.core.moduleloader;
+
+import org.junit.Test;
+
+// separated from its parent in order to start with clean environment with forked tests
+public class TestModuleLoaderRemote extends TestModuleLoader {
+
+ @Test
+ public void testRemote() throws Exception {
+ testModuleLoader(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1fbd37a..d2e2c6a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -37,12 +37,11 @@ import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.core.util.AppConfig;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.jboss.netty.handler.codec.http.HttpHeaders;
@@ -113,9 +112,11 @@ public class TestAutomaticDeployment extends ZkBasedTest {
CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
CommTestUtils.createZkClient());
- ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uri).build(), "cluster1", true, "localhost:2181");
+
+ // ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ // record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+ // zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@ -153,7 +154,7 @@ public class TestAutomaticDeployment extends ZkBasedTest {
Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
- // we start a
+ // we start a simple web server
InetSocketAddress addr = new InetSocketAddress(8080);
httpServer = HttpServer.create(addr, 0);
@@ -166,6 +167,7 @@ public class TestAutomaticDeployment extends ZkBasedTest {
// check resource loading (we use a zkclient without custom serializer)
ZkClient client2 = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
Assert.assertEquals("Salut!", client2.readData("/resourceData"));
+ client2.close();
}
@@ -192,6 +194,8 @@ public class TestAutomaticDeployment extends ZkBasedTest {
}
});
+ // CoreTestUtils.initAppConfig(new AppConfig.Builder().build(), true);
+
forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
// TODO synchro with ready state from zk
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 442bbcd..b48d371 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -32,12 +32,11 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@ -139,15 +138,13 @@ public class TestProducerConsumer {
}
});
- ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriConsumer);
- zkClient.create("/s4/clusters/" + CONSUMER_CLUSTER + "/app/s4App", record2, CreateMode.PERSISTENT);
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uriConsumer).build(), CONSUMER_CLUSTER, true,
+ "localhost:2181");
// TODO check that consumer app is ready with a better way than checking stream consumers
Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
- ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
- zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uriProducer).build(), PRODUCER_CLUSTER, true,
+ "localhost:2181");
// that may be a bit long to complete...
Assert.assertTrue(signalConsumptionComplete.await(30, TimeUnit.SECONDS));
@@ -194,8 +191,6 @@ public class TestProducerConsumer {
forkedProducerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + PRODUCER_CLUSTER });
forkedConsumerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CONSUMER_CLUSTER });
- // TODO synchro with ready state from zk
- // Thread.sleep(10000);
Assert.assertTrue(signalProcessesReady.await(20, TimeUnit.SECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 7e88dd8..9b24f3c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -26,8 +26,9 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.BaseModule;
import org.apache.s4.core.DefaultCoreModule;
-import org.apache.s4.core.Main;
+import org.apache.s4.core.S4Node;
import org.gradle.tooling.BuildLauncher;
import org.gradle.tooling.GradleConnector;
import org.gradle.tooling.ProjectConnection;
@@ -57,7 +58,7 @@ public class CoreTestUtils extends CommTestUtils {
}
public static Process forkS4Node(int debugPort, String[] args) throws IOException, InterruptedException {
- return forkProcess(Main.class.getName(), debugPort, args);
+ return forkProcess(S4Node.class.getName(), debugPort, args);
}
public static File findGradlewInRootDir() {
@@ -115,8 +116,10 @@ public class CoreTestUtils extends CommTestUtils {
public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
return Guice.createInjector(Modules.override(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1"),
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
new NonFailFastZookeeperClientsModule()));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 1c3ba84..7147894 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -63,7 +63,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
}
}
- System.out.println("seen: " + wcEvent.getWord() + "/" + wcEvent.getCount());
+ logger.info("seen: " + wcEvent.getWord() + "/" + wcEvent.getCount());
if (!counts.containsKey(wcEvent.getWord())
|| (counts.containsKey(wcEvent.getWord()) && counts.get(wcEvent.getWord()).compareTo(
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index db45fcb..f322256 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -29,14 +29,16 @@ import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.BaseModule;
import org.apache.s4.core.DefaultCoreModule;
-import org.apache.s4.core.Main;
+import org.apache.s4.core.S4Node;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
-import org.junit.Before;
import org.junit.Test;
import com.google.common.io.Resources;
@@ -54,22 +56,13 @@ public class WordCountTest extends ZkBasedTest {
public static final String FLAG = ";";
public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
private TCPEmitter emitter;
- private Injector injector;
+ Injector injector;
- // private static Factory zookeeperServerConnectionFactory;
-
- // @Before
- // public void prepare() throws IOException, InterruptedException, KeeperException {
- // CommTestUtils.cleanupTmpDirs();
- // zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
- //
- // }
-
- @Before
- public void prepareEmitter() throws IOException {
- injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), "cluster1"), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
- .openStream()));
+ public void createEmitter() throws IOException {
+ injector = Guice.createInjector(new BaseModule(
+ Resources.getResource("default.s4.base.properties").openStream(), "cluster1"), new DefaultCommModule(
+ Resources.getResource("default.s4.comm.properties").openStream()), new DefaultCoreModule(Resources
+ .getResource("default.s4.core.properties").openStream()));
emitter = injector.getInstance(TCPEmitter.class);
@@ -92,9 +85,12 @@ public class WordCountTest extends ZkBasedTest {
@Test
public void testSimple() throws Exception {
final ZooKeeper zk = CommTestUtils.createZkClient();
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(WordCountApp.class.getName()).build(),
+ "cluster1", true, "localhost:2181");
+ S4Node.main(new String[] { "-cluster=cluster1", });
- Main.main(new String[] { "-cluster=cluster1", "-appClass=" + WordCountApp.class.getName(),
- "-extraModulesClasses=" + WordCountModule.class.getName() });
+ // we create the emitter now, it will share zk node assignment with the S4 node
+ createEmitter();
CountDownLatch signalTextProcessed = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
@@ -114,6 +110,8 @@ public class WordCountTest extends ZkBasedTest {
public void injectSentence(String sentence) throws IOException {
Event event = new Event();
event.put("sentence", String.class, sentence);
+
+ // NOTE: we send to partition 1 since partition 0 hosts the emitter
emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
.serialize(event)));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
index 1b9f310..3041a94 100644
--- a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
@@ -21,6 +21,7 @@ package org.apache.s4.edsl;
import java.lang.reflect.Field;
import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.BaseModule;
import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.fixtures.ZkBasedTest;
import org.junit.Test;
@@ -35,9 +36,11 @@ public class TestEDSL extends ZkBasedTest {
@Test
public void test() throws Exception {
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ Injector injector = Guice
+ .createInjector(new BaseModule(Resources.getResource("default.s4.base.properties").openStream(),
+ "cluster1"), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+ .openStream()), new DefaultCoreModule(Resources.getResource("default.s4.core.properties")
+ .openStream()));
MyApp myApp = injector.getInstance(MyApp.class);
/* Normally. the container will handle this but this is just a test. */
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 94e359c..e39d00f 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -22,21 +22,25 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.I0Itec.zkclient.ZkClient;
-import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-import org.apache.zookeeper.CreateMode;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
import org.gradle.tooling.BuildLauncher;
import org.gradle.tooling.GradleConnector;
import org.gradle.tooling.ProgressListener;
import org.gradle.tooling.ProjectConnection;
import org.slf4j.LoggerFactory;
+import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.FileConverter;
+import com.beust.jcommander.internal.Maps;
import com.google.common.base.Strings;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
@@ -101,25 +105,12 @@ public class Deploy extends S4ArgsBase {
}
}
- final String uri = s4rToDeploy.toURI().toString();
- ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- record.putSimpleField("name", deployArgs.appName);
- String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
- if (zkClient.exists(deployedAppPath)) {
- ZNRecord readData = zkClient.readData(deployedAppPath);
- logger.error("Cannot deploy app [{}], because app [{}] is already deployed", deployArgs.appName,
- readData.getSimpleField("name"));
- System.exit(1);
- }
-
- zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/app/s4App", record, CreateMode.PERSISTENT);
- logger.info(
- "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
- new String[] { deployArgs.appName, deployArgs.clusterName,
- "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName,
- s4rToDeploy.getAbsolutePath() });
-
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder().appName(deployArgs.appName).appURI(s4rToDeploy.toURI().toString())
+ .customModulesNames(deployArgs.modulesClassesNames)
+ .customModulesURIs(deployArgs.modulesURIs)
+ .namedParameters(convertListArgsToMap(deployArgs.extraNamedParameters)).build(),
+ deployArgs.clusterName, false, deployArgs.zkConnectionString);
// Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
System.exit(0);
} catch (Exception e) {
@@ -128,6 +119,18 @@ public class Deploy extends S4ArgsBase {
}
+ private static Map<String, String> convertListArgsToMap(List<String> args) {
+ Map<String, String> result = Maps.newHashMap();
+ for (String arg : args) {
+ String[] split = arg.split("[=]");
+ if (!(split.length == 2)) {
+ throw new RuntimeException("Invalid args: " + Arrays.toString(args.toArray(new String[] {})));
+ }
+ result.put(split[0], split[1]);
+ }
+ return result;
+ }
+
@Parameters(commandNames = "s4 deploy", commandDescription = "Package and deploy application to S4 cluster", separators = "=")
static class DeployAppArgs extends S4ArgsBase {
@@ -155,6 +158,33 @@ public class Deploy extends S4ArgsBase {
@Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
int timeout = 10000;
+ @Parameter(names = { "-modulesURIs", "-mu" }, description = "URIs for fetching code of custom modules")
+ List<String> modulesURIs = new ArrayList<String>();
+
+ @Parameter(names = { "-modulesClasses", "-emc", "-mc" }, description = "Fully qualified class names of custom modules")
+ List<String> modulesClassesNames = new ArrayList<String>();
+
+ @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+ List<String> extraNamedParameters = new ArrayList<String>();
+
+ }
+
+ /**
+ * Parameters parsing utility.
+ *
+ */
+ public static class InlineConfigParameterConverter implements IStringConverter<String> {
+
+ @Override
+ public String convert(String arg) {
+ Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
+ logger.info("processing inline configuration parameter {}", arg);
+ Matcher parameterMatcher = parameterPattern.matcher(arg);
+ if (!parameterMatcher.find()) {
+ throw new IllegalArgumentException("Cannot understand parameter " + arg);
+ }
+ return parameterMatcher.group(1);
+ }
}
static class ExecGradle {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index dbd63b9..832cfcc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
-import org.apache.s4.core.Main;
+import org.apache.s4.core.S4Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +39,8 @@ public class Tools {
static Logger logger = LoggerFactory.getLogger(Tools.class);
enum Task {
- deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
- CreateApp.class), s4r(Package.class), status(Status.class);
+ deploy(Deploy.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(
+ null), newApp(CreateApp.class), s4r(Package.class), status(Status.class);
Class<?> target;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index ed71fb0..4893928 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -108,7 +108,7 @@ public class ZKServer {
+ "zookeeper" + File.separator + "data").getAbsolutePath();
@Parameter(names = "-clean", description = "clean zookeeper data (arity=0) (make sure you specify correct directories...)")
- boolean clean = true;
+ boolean clean = false;
@Parameter(names = "-logDir", description = "log directory")
String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
|