Updated Branches:
refs/heads/dev fe9f32f35 -> 051082eb7
Merge branch 'S4-59' into dev
- includes a few manual fixes for the merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/051082eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/051082eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/051082eb
Branch: refs/heads/dev
Commit: 051082eb7603aac816ad226a64a31c3c84c24ed5
Parents: fe9f32f 1aa1e77
Author: Matthieu Morel <mmorel@apache.org>
Authored: Wed Jan 23 21:41:15 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Wed Jan 23 23:33:58 2013 +0100
----------------------------------------------------------------------
subprojects/s4-base/s4-base.gradle | 2 +-
.../org/apache/s4/base/util/ModulesLoader.java | 16 +
.../org/apache/s4/base/util/S4RLoaderFactory.java | 2 +-
.../src/main/resources/default.s4.base.properties | 4 +
.../java/org/apache/s4/comm/DefaultCommModule.java | 24 +--
.../org/apache/s4/comm/ModulesLoaderFactory.java | 112 +++++++
.../apache/s4/comm/util/ArchiveFetchException.java | 14 +
.../org/apache/s4/comm/util/ArchiveFetcher.java | 42 +++
.../s4/comm/util/FileSystemArchiveFetcher.java | 41 +++
.../apache/s4/comm/util/HttpArchiveFetcher.java | 187 +++++++++++
.../org/apache/s4/comm/util/RemoteFileFetcher.java | 23 ++
.../src/main/resources/default.s4.comm.properties | 4 -
.../java/org/apache/s4/comm/tcp/TCPBasicTest.java | 25 +-
.../java/org/apache/s4/comm/udp/UDPBasicTest.java | 22 +-
.../org/apache/s4/fixtures/TestCommModule.java | 38 +++
.../main/java/org/apache/s4/core/BaseModule.java | 80 +++++
.../java/org/apache/s4/core/DefaultCoreModule.java | 15 +
.../src/main/java/org/apache/s4/core/Main.java | 242 ---------------
.../main/java/org/apache/s4/core/S4Bootstrap.java | 235 ++++++++++++++
.../src/main/java/org/apache/s4/core/S4Node.java | 79 +++++
.../src/main/java/org/apache/s4/core/Server.java | 2 +-
.../s4/core/ft/DefaultFileSystemStateStorage.java | 22 +--
.../ft/FileSystemBackendCheckpointingModule.java | 28 ++
.../java/org/apache/s4/core/util/AppConfig.java | 165 ++++++++++
.../s4/core/util/ParametersInjectionModule.java | 3 +-
.../java/org/apache/s4/deploy/DeploymentUtils.java | 38 +++
.../s4/deploy/DistributedDeploymentManager.java | 63 ++--
.../org/apache/s4/deploy/FileSystemS4RFetcher.java | 41 ---
.../java/org/apache/s4/deploy/HttpS4RFetcher.java | 186 -----------
.../main/java/org/apache/s4/deploy/S4RFetcher.java | 42 ---
.../org/apache/s4/core/ft/CheckpointingTest.java | 4 +
.../org/apache/s4/core/ft/FTWordCountTest.java | 23 +-
...ndWithZKStorageCallbackCheckpointingModule.java | 8 +-
.../java/org/apache/s4/core/ft/RecoveryTest.java | 31 ++-
.../core/moduleloader/ModuleLoaderTestUtils.java | 110 +++++++
.../s4/core/moduleloader/TestModuleLoader.java | 13 +
.../core/moduleloader/TestModuleLoaderRemote.java | 25 ++
.../apache/s4/deploy/TestAutomaticDeployment.java | 21 +-
.../s4/deploy/prodcon/TestProducerConsumer.java | 19 +-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 19 +-
.../org/apache/s4/wordcount/WordClassifierPE.java | 2 +-
.../org/apache/s4/wordcount/WordCountTest.java | 39 +--
.../src/test/java/org/apache/s4/edsl/TestEDSL.java | 9 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 97 +++++--
.../src/main/java/org/apache/s4/tools/Status.java | 14 +-
.../src/main/java/org/apache/s4/tools/Tools.java | 6 +-
.../main/java/org/apache/s4/tools/ZKServer.java | 4 +-
.../src/main/resources/templates/newApp.README | 7 +-
.../s4-tools/src/main/resources/templates/s4 | 8 +-
.../test/java/org/apache/s4/tools/TestDeploy.java | 1 -
test-apps/custom-modules/README.txt | 1 +
test-apps/custom-modules/build.gradle | 116 +++++++
.../src/main/java/org/apache/s4/TestListener.java | 122 ++++++++
.../java/org/apache/s4/TestListenerModule.java | 14 +
test-apps/twitter-adapter/build.gradle | 2 +-
55 files changed, 1784 insertions(+), 728 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index a295290,2fa3e0e..9c37b4d
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@@ -30,11 -29,8 +29,9 @@@ import org.apache.s4.base.Hasher
import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
import org.apache.s4.comm.tcp.RemoteEmitters;
- import org.apache.s4.comm.topology.Assignment;
- import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterFromZK;
import org.apache.s4.comm.topology.Clusters;
@@@ -88,15 -82,13 +83,12 @@@ public class DefaultCommModule extends
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
/* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
+ // classloader
+ install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+ SerializerDeserializerFactory.class));
- // a node holds a single partition assignment
- // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
- bind(Assignment.class).to(AssignmentFromZK.class);
- // // a node holds a single partition assignment
- // // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
- // bind(Assignment.class).to(AssignmentFromZK.class);
bind(Cluster.class).to(ClusterFromZK.class);
- // bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(Clusters.class).to(ClustersFromZK.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
index 0000000,4ee28d3..39b98d9
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
@@@ -1,0 -1,187 +1,187 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.s4.comm.util;
+
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileNotFoundException;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.InetSocketAddress;
+ import java.net.URI;
+ import java.util.concurrent.Executors;
+
+ import org.jboss.netty.bootstrap.ClientBootstrap;
+ import org.jboss.netty.buffer.ChannelBuffer;
+ import org.jboss.netty.buffer.ChannelBufferInputStream;
+ import org.jboss.netty.channel.Channel;
+ import org.jboss.netty.channel.ChannelFuture;
+ import org.jboss.netty.channel.ChannelHandlerContext;
+ import org.jboss.netty.channel.ChannelPipeline;
+ import org.jboss.netty.channel.ChannelPipelineFactory;
+ import org.jboss.netty.channel.Channels;
+ import org.jboss.netty.channel.MessageEvent;
+ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+ import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+ import org.jboss.netty.handler.codec.http.HttpChunk;
+ import org.jboss.netty.handler.codec.http.HttpClientCodec;
+ import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
+ import org.jboss.netty.handler.codec.http.HttpHeaders;
+ import org.jboss.netty.handler.codec.http.HttpMethod;
+ import org.jboss.netty.handler.codec.http.HttpRequest;
+ import org.jboss.netty.handler.codec.http.HttpResponse;
+ import org.jboss.netty.handler.codec.http.HttpVersion;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.common.io.ByteStreams;
+
+ /**
+ * <p>
+ * Fetches modules and app archives through HTTP.
+ * </p>
+ * <p>
+ * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
+ *
+ * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
+ * snoop example</a>
+ *
+ * </p>
+ */
+ public class HttpArchiveFetcher implements ArchiveFetcher {
+
+ private static Logger logger = LoggerFactory.getLogger(HttpArchiveFetcher.class);
+
+ @Override
+ public InputStream fetch(URI uri) throws ArchiveFetchException {
+ logger.debug("Fetching file through http: {}", uri.toString());
+
+ String host = uri.getHost();
+ int port = uri.getPort();
+ if (port == -1) {
+ if (uri.getScheme().equalsIgnoreCase("http")) {
+ port = 80;
+ } else if (uri.getScheme().equalsIgnoreCase("https")) {
+ port = 443;
+ }
+ }
+
+ ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+ File tmpFile;
+ try {
+ tmpFile = File.createTempFile("http", "download");
+ } catch (IOException e) {
+ throw new ArchiveFetchException("Cannot create temporary file for fetching archive data from http server",
+ e);
+ }
+ clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
+ ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
+ // TODO timeout?
+ Channel channel = channelFuture.awaitUninterruptibly().getChannel();
+ if (!channelFuture.isSuccess()) {
+ clientBootstrap.releaseExternalResources();
+ throw new ArchiveFetchException("Cannot connect to http uri [" + uri.toString() + "]",
+ channelFuture.getCause());
+ }
+
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
++ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath());
+ request.setHeader(HttpHeaders.Names.HOST, host);
+ request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+ channel.write(request);
+
+ channel.getCloseFuture().awaitUninterruptibly();
+
+ clientBootstrap.releaseExternalResources();
+
+ logger.debug("Finished downloading archive file through http {}, as file: {}", uri.toString(),
+ tmpFile.getAbsolutePath());
+ try {
+ return new FileInputStream(tmpFile);
+ } catch (FileNotFoundException e) {
+ throw new ArchiveFetchException("Cannot get input stream from temporary file with s4r data ["
+ + tmpFile.getAbsolutePath() + "]");
+ }
+ }
+
+ private class HttpClientPipelineFactory implements ChannelPipelineFactory {
+
+ File tmpFile;
+
+ public HttpClientPipelineFactory(File tmpFile) {
+ this.tmpFile = tmpFile;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("codec", new HttpClientCodec());
+
+ // Remove the following line if you don't want automatic content decompression.
+ pipeline.addLast("inflater", new HttpContentDecompressor());
+
+ pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
+ return pipeline;
+ }
+ }
+
+ // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
+ private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+
+ private boolean readingChunks;
+ FileOutputStream fos;
+
+ public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
+ this.fos = new FileOutputStream(tmpFile);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (!readingChunks) {
+ HttpResponse response = (HttpResponse) e.getMessage();
+
+ if (response.isChunked()) {
+ readingChunks = true;
+ } else {
+ copyContentToTmpFile(response.getContent());
+ }
+ } else {
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if (chunk.isLast()) {
+ readingChunks = false;
+ fos.close();
+ } else {
+ copyContentToTmpFile(chunk.getContent());
+ }
+ }
+
+ }
+
+ private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
+ ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
+ ByteStreams.copy(cbis, fos);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index fad7ec4,bef0894..49ad568
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@@ -1,33 -1,5 +1,29 @@@
s4.comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener
+
# I/O channel connection timeout, when applicable (e.g. used by netty)
s4.comm.timeout=1000
- s4.cluster.zk_address = localhost:2181
- s4.cluster.zk_session_timeout = 10000
- s4.cluster.zk_connection_timeout = 10000
-
+
+# NOTE: the following numbers should be tuned according to the application, use case, and infrastructure
+
+# how many threads to use for the sender stage (i.e. serialization)
+s4.sender.parallelism=1
+# maximum number of events in the buffer of the sender stage
+s4.sender.workQueueSize=10000
+# maximum sending rate from a given node, in events / s (used with throttling sender executors)
+s4.sender.maxRate=200000
+
+# how many threads to use for the *remote* sender stage (i.e. serialization)
+s4.remoteSender.parallelism=1
+# maximum number of events in the buffer of the *remote* sender stage
+s4.remoteSender.workQueueSize=10000
+# maximum *remote* sending rate from a given node, in events / s (used with throttling *remote* sender executors)
+s4.remoteSender.maxRate=200000
+
+# maximum number of pending writes to a given comm channel
+s4.emitter.maxPendingWrites=1000
+
+# maximum number of events in the buffer of the processing stage
+s4.stream.workQueueSize=10000
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
index a4bd8a2,0000000..e400813
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@@ -1,45 -1,0 +1,54 @@@
+package org.apache.s4.comm.tcp;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
- import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.MockReceiverModule;
+import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.TCPTransportModule;
++import org.apache.s4.fixtures.TestCommModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
++import com.google.inject.util.Modules;
+
+public class TCPBasicTest extends ZkBasedTest {
+
++ public TCPBasicTest() {
++ super(2);
++ }
++
+ @Test
+ public void testSingleMessage() throws Exception {
+
- Injector injector1 = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new TCPTransportModule(), new NoOpReceiverModule());
++ Injector injector1 = Guice.createInjector(Modules.override(
++ new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++ new TCPTransportModule(), new NoOpReceiverModule()));
++
++ // this node picks partition 0
+ Emitter emitter = injector1.getInstance(Emitter.class);
+
- Injector injector2 = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new TCPTransportModule(), new MockReceiverModule());
++ Injector injector2 = Guice.createInjector(Modules.override(
++ new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++ new TCPTransportModule(), new MockReceiverModule()));
++
+ // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+ // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
+ injector2.getInstance(Listener.class);
+
- emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
++ // send to the other node
++ emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+
+ // check receiver got the message
+ Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
index 47cb39c,0000000..b06734b
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@@ -1,42 -1,0 +1,52 @@@
+package org.apache.s4.comm.udp;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
- import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.MockReceiverModule;
+import org.apache.s4.fixtures.NoOpReceiverModule;
++import org.apache.s4.fixtures.TestCommModule;
+import org.apache.s4.fixtures.UDPTransportModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
++import com.google.inject.util.Modules;
+
+public class UDPBasicTest extends ZkBasedTest {
+
++ public UDPBasicTest() {
++ super(2);
++ }
++
+ @Test
+ public void testSingleMessage() throws Exception {
+
- Injector injector1 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
- .openStream(), "cluster1"), new UDPTransportModule(), new NoOpReceiverModule());
++ Injector injector1 = Guice.createInjector(Modules.override(
++ new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++ new UDPTransportModule(), new NoOpReceiverModule()));
++ // this picks partition 0
+ Emitter emitter = injector1.getInstance(Emitter.class);
+
- Injector injector2 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
- .openStream(), "cluster1"), new UDPTransportModule(), new MockReceiverModule());
++ Injector injector2 = Guice.createInjector(Modules.override(
++ new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++ new UDPTransportModule(), new MockReceiverModule()));
++
+ // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+ // listener)
+ injector2.getInstance(Listener.class);
+
- emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
++ // send to the other partition (1)
++ emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+
+ Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 56d2f35,1f07bf6..7789b9f
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@@ -84,15 -87,19 +88,26 @@@ public class DefaultCoreModule extends
// For enabling checkpointing, one needs to use a custom module, such as
// org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+
+ // shed load in local sender only by default
+ bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
+ bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
+
+ bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
+
}
+ @Provides
+ @Named("s4.tmp.dir")
+ public File provideTmpDir() {
+ File tmpS4Dir = Files.createTempDir();
+ tmpS4Dir.deleteOnExit();
+ logger.warn(
+ "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+ tmpS4Dir.getAbsolutePath());
+ return tmpS4Dir;
+ }
+
private void loadProperties(Binder binder) {
try {
config = new PropertiesConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 0000000,e469121..77dbc67
mode 000000,100644..100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@@ -1,0 -1,228 +1,235 @@@
+ package org.apache.s4.core;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.atomic.AtomicBoolean;
+
+ import org.I0Itec.zkclient.IZkDataListener;
+ import org.I0Itec.zkclient.serialize.ZkSerializer;
+ import org.apache.s4.base.util.ModulesLoader;
+ import org.apache.s4.comm.DefaultCommModule;
+ import org.apache.s4.comm.ModulesLoaderFactory;
+ import org.apache.s4.comm.topology.ZNRecord;
+ import org.apache.s4.comm.topology.ZNRecordSerializer;
+ import org.apache.s4.comm.topology.ZkClient;
+ import org.apache.s4.comm.util.ArchiveFetchException;
+ import org.apache.s4.comm.util.ArchiveFetcher;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.core.util.ParametersInjectionModule;
+ import org.apache.zookeeper.CreateMode;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.common.base.Strings;
+ import com.google.common.io.ByteStreams;
+ import com.google.common.io.Files;
+ import com.google.common.io.Resources;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.Inject;
+ import com.google.inject.Injector;
+ import com.google.inject.Module;
+ import com.google.inject.name.Named;
+ import com.google.inject.util.Modules;
+ import com.google.inject.util.Modules.OverriddenModuleBuilder;
+
+ /**
+ * This is the bootstrap for S4 nodes.
+ * <p>
+ * Its roles are to:
+ * <ul>
+ * <li>register within the S4 cluster (and acquire a partition).
+ * <li>look for application deployed on the S4 cluster
+ * </ul>
+ * <p>
+ * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
+ * application code is then downloaded and the app started.
+ * <p>
+ * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
+ * application classes are available in the classpath.
+ *
+ *
+ *
+ */
+ public class S4Bootstrap {
+ private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
+
+ private final ZkClient zkClient;
+ private final String appPath;
+ private final AtomicBoolean deployed = new AtomicBoolean(false);
+
+ private final ArchiveFetcher fetcher;
+
+ private Injector parentInjector;
+
+ CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+
+ @Inject
+ public S4Bootstrap(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ArchiveFetcher fetcher) {
+
+ this.fetcher = fetcher;
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+
+ String appDir = "/s4/clusters/" + clusterName + "/app";
+ if (!zkClient.exists(appDir)) {
+ zkClient.create(appDir, null, CreateMode.PERSISTENT);
+ }
+ appPath = appDir + "/s4App";
+ zkClient.subscribeDataChanges(appPath, new AppChangeListener());
+ }
+
+ public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException {
+ this.parentInjector = parentInjector;
+ if (zkClient.exists(appPath)) {
+ if (!deployed.get()) {
+ loadModulesAndStartNode(parentInjector);
+ }
+ }
+
+ signalOneAppLoaded.await();
+ }
+
+ private void loadModulesAndStartNode(final Injector parentInjector) throws ArchiveFetchException {
+
+ final ZNRecord appData = zkClient.readData(appPath);
+ // can be null
+ final AppConfig appConfig = new AppConfig(appData);
+
+ String appName = appConfig.getAppName();
+
+ List<File> modulesLocalCopies = new ArrayList<File>();
+
+ for (String uriString : appConfig.getCustomModulesURIs()) {
+ modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString));
+ }
+ final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ // load app class through modules classloader and start it
+ S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+ signalOneAppLoaded.countDown();
+ }
+ }, "S4 platform loader");
+ t.start();
+
+ }
+
+ private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
+
+ URI uri;
+ try {
+ uri = new URI(uriString);
+ } catch (URISyntaxException e2) {
+ throw new ArchiveFetchException("Invalid module URI : [" + uriString + "]", e2);
+ }
+ File localModuleFileCopy;
+ try {
+ localModuleFileCopy = File.createTempFile("tmp", "module");
+ } catch (IOException e1) {
+ logger.error(
+ "Cannot deploy app [{}] because a local copy of the module file could not be initialized due to [{}]",
+ appName, e1.getClass().getName() + "->" + e1.getMessage());
+ throw new ArchiveFetchException("Cannot deploy application [" + appName + "]", e1);
+ }
+ localModuleFileCopy.deleteOnExit();
+ try {
+ if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localModuleFileCopy)) == 0) {
+ throw new ArchiveFetchException("Cannot copy archive from [" + uri.toString() + "] to ["
+ + localModuleFileCopy.getAbsolutePath() + "] (nothing was copied)");
+ }
+ } catch (Exception e) {
+ throw new ArchiveFetchException("Cannot deploy application [" + appName + "] from URI [" + uri.toString()
+ + "] ", e);
+ }
+ return localModuleFileCopy;
+ }
+
+ public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+ try {
+ Injector injector;
+ InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+ InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+
+ logger.info("Initializing S4 app with : {}", appConfig.toString());
+
+ AbstractModule commModule = new DefaultCommModule(commConfigFileInputStream);
+ AbstractModule coreModule = new DefaultCoreModule(coreConfigFileInputStream);
+
+ List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+ for (String moduleClass : appConfig.getCustomModulesNames()) {
+ extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+ }
+ Module combinedModule = Modules.combine(commModule, coreModule);
+ if (extraModules.size() > 0) {
+ OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+ combinedModule = overridenModuleBuilder.with(extraModules);
+ }
+
+ if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+
+ logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+ Map<String, String> namedParameters = new HashMap<String, String>();
+
+ namedParameters.putAll(appConfig.getNamedParameters());
+ combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+ }
+
- injector = parentInjector.createChildInjector(combinedModule);
-
+ if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
++ // In that case we won't be using an S4R classloader, app classes are available from the current
++ // classloader
++ // The app module provides bindings specific to the app class loader, in this case the current thread's
++ // class loader.
++ AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
++ // NOTE: because the app module can be overriden
++ combinedModule = Modules.override(appModule).with(combinedModule);
++ injector = parentInjector.createChildInjector(combinedModule);
+ logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+ App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+ app.init();
+ app.start();
+ } else {
++ injector = parentInjector.createChildInjector(combinedModule);
+ if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
+ logger.info("S4 node in standby until app class or app URI is specified");
+ }
+ Server server = injector.getInstance(Server.class);
+ server.start(injector);
+ }
+ } catch (Exception e) {
+ logger.error("Cannot start S4 node", e);
+ System.exit(1);
+ }
+ }
+
+ class AppChangeListener implements IZkDataListener {
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ if (!deployed.get()) {
+ loadModulesAndStartNode(parentInjector);
+ deployed.set(true);
+ }
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ logger.error("Application undeployment is not supported yet");
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
index 3e2d617,5178c97..4ce4693
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@@ -29,6 -39,23 +39,24 @@@ public class FileSystemBackendCheckpoin
protected void configure() {
bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
+
}
+
+ @Provides
+ @Named("s4.checkpointing.filesystem.storageRootPath")
+ public String provideStorageRootPath() {
+ File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ + "storage");
+ String storageRootPath = defaultStorageDir.getAbsolutePath();
+ logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+ if (!defaultStorageDir.exists()) {
+ if (!(defaultStorageDir.mkdirs())) {
+ logger.error("Storage directory not specified, and cannot create default storage directory : "
+ + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+ }
+ }
+ return storageRootPath;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 2b3f8c7,8d3d324..50846b4
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@@ -25,8 -25,11 +25,10 @@@ import java.util.concurrent.TimeUnit
import junit.framework.Assert;
import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.TCPEmitter;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index da2bf5d,2f4f742..4dc9484
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@@ -27,9 -27,13 +27,12 @@@ import junit.framework.Assert
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZkClient;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
+ import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index 0000000,3b95334..e5d0d01
mode 000000,100644..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@@ -1,0 -1,109 +1,110 @@@
+ package org.apache.s4.core.moduleloader;
+
+ import java.io.File;
+ import java.util.List;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+
+ import junit.framework.Assert;
+
+ import org.apache.s4.base.Emitter;
+ import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
+ import org.apache.s4.base.SerializerDeserializer;
+ import org.apache.s4.comm.DefaultCommModule;
++import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+ import org.apache.s4.comm.tcp.TCPEmitter;
+ import org.apache.s4.comm.topology.ZkClient;
+ import org.apache.s4.core.BaseModule;
+ import org.apache.s4.core.S4Node;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
+ import org.apache.s4.fixtures.CommTestUtils;
+ import org.apache.s4.fixtures.CoreTestUtils;
+ import org.apache.s4.wordcount.WordCountApp;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.ZooKeeper;
+
+ import com.beust.jcommander.internal.Lists;
+ import com.google.common.base.Strings;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.io.Resources;
+ import com.google.inject.Guice;
+ import com.google.inject.Injector;
+
+ public class ModuleLoaderTestUtils {
+
+ private static final int NB_MESSAGES = 10;
+
+ public static Process testModuleLoader(boolean fork) throws Exception {
+
+ Process forkedS4Node = null;
+ // build custom-modules.jar
+ File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/custom-modules/build.gradle"), "clean", new String[0]);
+
+ File modulesJarFile = new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/custom-modules/build/libs/app/custom-modules.jar");
+
+ Assert.assertFalse(modulesJarFile.exists());
+
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/custom-modules/build.gradle"), "jar", new String[0]);
+
+ // make sure it is created
+ Assert.assertTrue(modulesJarFile.exists());
+
+ // pass it as a configuration
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder().appClassName(WordCountApp.class.getName())
+ .customModulesURIs(ImmutableList.of(modulesJarFile.toURI().toString()))
+ .customModulesNames(ImmutableList.of("org.apache.s4.TestListenerModule")).build(), "cluster1",
+ true, "localhost:2181");
+ if (fork) {
+ forkedS4Node = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+ } else {
+ S4Node.main(new String[] { "-c", "cluster1" });
+ }
+
+ Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
+ .openStream(), "cluster1"), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+ .openStream()));
+
+ Emitter emitter = injector.getInstance(TCPEmitter.class);
+ List<Long> messages = Lists.newArrayList();
+ for (int i = 0; i < NB_MESSAGES; i++) {
+ messages.add(System.currentTimeMillis());
+ }
+
+ ZkClient zkClient = new ZkClient("localhost:2181");
+ zkClient.create("/test", 0, CreateMode.PERSISTENT);
+
+ final ZooKeeper zk = CommTestUtils.createZkClient();
+ final CountDownLatch signalMessagesReceived = new CountDownLatch(1);
+
+ // watch for last message in test data sequence
+ CoreTestUtils.watchAndSignalCreation("/test/data" + Strings.padStart(String.valueOf(NB_MESSAGES - 1), 10, '0'),
+ signalMessagesReceived, zk);
+
++ SerializerDeserializer serDeser = injector.getInstance(SerializerDeserializerFactory.class)
++ .createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+ for (Long message : messages) {
+ Event event = new Event();
+ event.put("message", long.class, message);
- emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
- .serialize(event)));
++ event.setStreamId("inputStream");
++ emitter.send(0, serDeser.serialize(event));
+ }
+
+ // check sequential nodes in zk with correct data
+ Assert.assertTrue(signalMessagesReceived.await(10, TimeUnit.SECONDS));
+ List<String> children = zkClient.getChildren("/test");
+ for (String child : children) {
+ Long data = zkClient.readData("/test/" + child);
+ Assert.assertTrue(messages.contains(data));
+ }
+
+ return forkedS4Node;
+
+ }
-
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
index 0000000,a1ee016..da1f62b
mode 000000,100644..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
@@@ -1,0 -1,18 +1,13 @@@
+ package org.apache.s4.core.moduleloader;
+
+ import org.apache.s4.fixtures.ZkBasedTest;
+ import org.junit.Test;
+
+ public class TestModuleLoader extends ZkBasedTest {
+
- public TestModuleLoader() {
- // need 2 partitions: 1 for the test emitter, 1 for the S4 node
- super(2);
- }
-
+ @Test
+ public void testLocal() throws Exception {
+ ModuleLoaderTestUtils.testModuleLoader(false);
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 3549e25,d2e2c6a..8b5329b
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@@ -30,17 -34,17 +30,16 @@@ import org.I0Itec.zkclient.IZkChildList
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.TCPEmitter;
- import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
+ import org.apache.s4.core.util.AppConfig;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.S4RHttpServer;
import org.apache.s4.fixtures.ZkBasedTest;
- import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -105,12 -112,11 +104,11 @@@ public class TestAutomaticDeployment ex
CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
CommTestUtils.createZkClient());
- DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uri).build(), "cluster1", true, "localhost:2181");
-
- // ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- // record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- // zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
+ if (createZkAppNode) {
+ // otherwise we need to do that through a separate tool
- ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
++ DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uri).build(), "cluster1", true,
++ "localhost:2181");
+ }
Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@@ -193,33 -204,16 +192,23 @@@
}
- // @Before
- // public void clean() throws Exception {
- // final ZooKeeper zk = CommTestUtils.createZkClient();
- // try {
- // zk.delete("/simpleAppCreated", -1);
- // } catch (Exception ignored) {
- // }
- //
- // zk.close();
- // }
+ public static void checkNoAppAlreadyDeployed(ZkClient zkClient) {
+ List<String> processes = zkClient.getChildren("/s4/clusters/cluster1/process");
+ Assert.assertTrue(processes.size() == 0);
+ final CountDownLatch signalProcessesReady = new CountDownLatch(1);
+
+ zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 2) {
+ signalProcessesReady.countDown();
+ }
+
+ }
+ });
- }
+
- // @Before
- // public void clean() throws Exception {
- // final ZooKeeper zk = CommTestUtils.createZkClient();
- // try {
- // zk.delete("/simpleAppCreated", -1);
- // } catch (Exception ignored) {
- // }
- //
- // zk.close();
- // }
++ }
@After
public void cleanup() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 8ee58e5,f322256..5b4f596
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@@ -25,12 -25,15 +25,14 @@@ import java.util.concurrent.TimeUnit
import junit.framework.Assert;
import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.TCPEmitter;
- import org.apache.s4.core.AppModule;
+ import org.apache.s4.core.BaseModule;
import org.apache.s4.core.DefaultCoreModule;
- import org.apache.s4.core.Main;
+ import org.apache.s4.core.S4Node;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.CreateMode;
@@@ -111,14 -107,13 +106,16 @@@ public class WordCountTest extends ZkBa
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
}
- public void injectSentence(String sentence) throws IOException {
+ public void injectSentence(String sentence) throws IOException, InterruptedException {
Event event = new Event();
+ event.setStreamId("inputStream");
event.put("sentence", String.class, sentence);
+
- // NOTE: we send to partition 1 since partition 0 hosts the emitter
- emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
- .serialize(event)));
++ // NOTE: we send to partition 0 since partition 1 hosts the emitter
+ emitter.send(
+ 0,
+ injector.getInstance(SerializerDeserializerFactory.class)
- .createSerializerDeserializer(getClass().getClassLoader()).serialize(event));
++ .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index ac8e214,69ebaa6..76a1d43
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@@ -67,18 -70,23 +71,21 @@@ public class Deploy extends S4ArgsBase
System.exit(1);
}
- URI s4rURI;
- File s4rToDeploy = null;
++ URI s4rURI = null;
if (deployArgs.s4rPath != null) {
- // 1. if there is an application archive, we use it
- s4rToDeploy = new File(deployArgs.s4rPath);
- if (!s4rToDeploy.exists()) {
- logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
- System.exit(1);
- } else {
- logger.info(
- "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
- s4rToDeploy.getAbsolutePath());
+ s4rURI = new URI(deployArgs.s4rPath);
- // if (!s4rToDeploy.exists()) {
- // logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
- // System.exit(1);
- // } else {
++ if (Strings.isNullOrEmpty(s4rURI.getScheme())) {
++ // default is file
++ s4rURI = new File(deployArgs.s4rPath).toURI();
+ }
+ logger.info(
+ "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+ s4rURI.toString());
- } else {
+ } else if (deployArgs.gradleBuildFile != null) {
+
+ // 2. otherwise if there is a build file, we create the S4R archive from that
+
List<String> params = new ArrayList<String>();
// prepare gradle -P parameters, including passed gradle opts
params.addAll(deployArgs.gradleOpts);
@@@ -97,31 -105,29 +104,31 @@@
System.exit(1);
}
} else {
- s4rToDeploy = tmpS4R;
+ s4rURI = tmpS4R.toURI();
}
- }
+ } else {
+ if (!Strings.isNullOrEmpty(deployArgs.appClass)) {
+ // 3. otherwise if there is at least an app class specified (e.g. for running "s4 adapter"), we use
+ // it and won't use an S4R
+ logger.info("No S4R path specified, nor build file specified: this assumes the app is in the classpath");
+ } else {
+ logger.error("You must specify an S4R file, a build file to create an S4R from, or an appClass that will be in the classpath");
+ System.exit(1);
+ }
- ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, s4rURI.toString());
- record.putSimpleField("name", deployArgs.appName);
- String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
- if (zkClient.exists(deployedAppPath)) {
- ZNRecord readData = zkClient.readData(deployedAppPath);
- logger.error("Cannot deploy app [{}], because app [{}] is already deployed", deployArgs.appName,
- readData.getSimpleField("name"));
- System.exit(1);
}
- zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/app/s4App", record, CreateMode.PERSISTENT);
- logger.info(
- "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
- new String[] { deployArgs.appName, deployArgs.clusterName,
- "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName, s4rURI.toString() });
-
+ DeploymentUtils.initAppConfig(
+ new AppConfig.Builder().appName(deployArgs.appName)
- .appURI(s4rToDeploy != null ? s4rToDeploy.toURI().toString() : null)
- .appClassName(deployArgs.appClass).customModulesNames(deployArgs.modulesClassesNames)
- .customModulesURIs(deployArgs.modulesURIs)
++ .appURI(s4rURI == null ? null : s4rURI.toString())
++ .customModulesNames(deployArgs.modulesClassesNames)
++ .customModulesURIs(deployArgs.modulesURIs).appClassName(deployArgs.appClass)
+ .namedParameters(convertListArgsToMap(deployArgs.extraNamedParameters)).build(),
+ deployArgs.clusterName, false, deployArgs.zkConnectionString);
// Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
- System.exit(0);
+ if (!deployArgs.testMode) {
+ System.exit(0);
+ }
} catch (Exception e) {
LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
}
@@@ -155,9 -173,33 +174,35 @@@
@Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
int timeout = 10000;
+ @Parameter(names = { "-modulesURIs", "-mu" }, description = "URIs for fetching code of custom modules")
+ List<String> modulesURIs = new ArrayList<String>();
+
+ @Parameter(names = { "-modulesClasses", "-emc", "-mc" }, description = "Fully qualified class names of custom modules")
+ List<String> modulesClassesNames = new ArrayList<String>();
+
+ @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+ List<String> extraNamedParameters = new ArrayList<String>();
+
+ @Parameter(names = "-testMode", description = "Special mode for regression testing", hidden = true)
+ boolean testMode = false;
+ }
+ /**
+ * Parameters parsing utility.
+ *
+ */
+ public static class InlineConfigParameterConverter implements IStringConverter<String> {
+
+ @Override
+ public String convert(String arg) {
+ Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
+ logger.info("processing inline configuration parameter {}", arg);
+ Matcher parameterMatcher = parameterPattern.matcher(arg);
+ if (!parameterMatcher.find()) {
+ throw new IllegalArgumentException("Cannot understand parameter " + arg);
+ }
+ return parameterMatcher.group(1);
+ }
}
static class ExecGradle {
@@@ -198,7 -240,7 +243,5 @@@
connection.close();
}
}
--
}
--
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
index b9f3b5d,0000000..cff7359
mode 100644,000000..100644
--- a/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
+++ b/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
@@@ -1,109 -1,0 +1,108 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.AppConstants;
+import org.apache.s4.deploy.TestAutomaticDeployment;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.S4RHttpServer;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class TestDeploy extends ZkBasedTest {
+
+ private Process forkedNode;
+ private S4RHttpServer s4rHttpServer;
+
+ @Test
+ public void testDeployment() throws Exception {
+ TestAutomaticDeployment.createS4RFiles();
+
+ ZkClient zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ final CountDownLatch signalNodeReady = new CountDownLatch(1);
+
+ zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 1) {
+ signalNodeReady.countDown();
+ }
+
+ }
+ });
+
+ TestAutomaticDeployment.checkNoAppAlreadyDeployed(zkClient);
+
+ forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
+
+ Assert.assertTrue(signalNodeReady.await(10, TimeUnit.SECONDS));
+
+ // deploy app
+
+ Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+
+ File tmpDir = Files.createTempDir();
+
+ File s4rToDeploy = new File(tmpDir, String.valueOf(System.currentTimeMillis()));
+
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(TestAutomaticDeployment.tmpAppsDir.getAbsolutePath()
+ + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+ s4rHttpServer = new S4RHttpServer(8080, tmpDir);
+ s4rHttpServer.start();
+
- // Deploy
+ Deploy.main(new String[] { "-s4r", "http://localhost:8080/s4/" + s4rToDeploy.getName(), "-c", "cluster1",
+ "-appName", "toto", "-testMode" });
+
+ TestAutomaticDeployment.assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName(), zkClient, false);
+
+ // check resource loading (we use a zkclient without custom serializer)
+ ZkClient client2 = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
+ Assert.assertEquals("Salut!", client2.readData("/resourceData"));
+
+ }
+
+ @After
+ public void cleanup() throws IOException, InterruptedException {
+ CoreTestUtils.killS4App(forkedNode);
+ if (s4rHttpServer != null) {
+ s4rHttpServer.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
----------------------------------------------------------------------
diff --cc test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
index 0000000,3451a28..c3916f0
mode 000000,100644..100644
--- a/test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
+++ b/test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
@@@ -1,0 -1,136 +1,122 @@@
+ package org.apache.s4;
+
+ import java.net.InetSocketAddress;
+ import java.util.concurrent.Executors;
+
+ import org.apache.commons.lang.NotImplementedException;
+ import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
+ import org.apache.s4.base.Listener;
+ import org.apache.s4.base.SerializerDeserializer;
+ import org.apache.s4.comm.topology.Assignment;
+ import org.apache.s4.comm.topology.ClusterNode;
+ import org.apache.s4.comm.topology.ZkClient;
+ import org.jboss.netty.bootstrap.ServerBootstrap;
+ import org.jboss.netty.buffer.ChannelBuffer;
+ import org.jboss.netty.channel.Channel;
+ import org.jboss.netty.channel.ChannelFactory;
+ import org.jboss.netty.channel.ChannelFuture;
+ import org.jboss.netty.channel.ChannelFutureListener;
+ import org.jboss.netty.channel.ChannelHandlerContext;
+ import org.jboss.netty.channel.ChannelPipeline;
+ import org.jboss.netty.channel.ChannelPipelineFactory;
+ import org.jboss.netty.channel.Channels;
+ import org.jboss.netty.channel.ExceptionEvent;
+ import org.jboss.netty.channel.MessageEvent;
+ import org.jboss.netty.channel.SimpleChannelHandler;
+ import org.jboss.netty.channel.group.ChannelGroup;
+ import org.jboss.netty.channel.group.DefaultChannelGroup;
+ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+ import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.inject.Inject;
+ import com.google.inject.name.Named;
+
+ public class TestListener implements Listener {
+
+ private static Logger logger = LoggerFactory.getLogger(TestListener.class);
+
+ private ServerBootstrap bootstrap;
+ private final ChannelGroup channels = new DefaultChannelGroup();
+ private ZkClient zkClient;
+ private SerializerDeserializer serDeser;
+
+ @Inject
+ public TestListener(Assignment assignment, @Named("s4.comm.timeout") int timeout, SerializerDeserializer serDeser) {
+ // wait for an assignment
+ ClusterNode node = assignment.assignClusterNode();
+ this.serDeser = serDeser;
+
+ ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+
+ bootstrap = new ServerBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
+ p.addLast("2", new MyChannelHandler());
+
+ return p;
+ }
+ });
+
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+ bootstrap.setOption("child.reuseAddress", true);
+ bootstrap.setOption("child.connectTimeoutMillis", timeout);
+ bootstrap.setOption("readWriteFair", true);
+
+ Channel c = bootstrap.bind(new InetSocketAddress(node.getPort()));
+ channels.add(c);
+
+ zkClient = new ZkClient("localhost:2181");
+ }
+
+ @Override
- public byte[] recv() {
- while (true) {
- try {
- Thread.sleep(100000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
- @Override
+ public void close() {
+ try {
+ channels.close().await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ bootstrap.releaseExternalResources();
+
+ }
+
+ @Override
+ public int getPartitionId() {
+ throw new NotImplementedException();
+ }
+
+ public class MyChannelHandler extends SimpleChannelHandler {
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ channels.add(e.getChannel());
+ ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
- Event event = (Event) serDeser.deserialize(((EventMessage) serDeser.deserialize(buffer.array()))
- .getSerializedEvent());
++ Event event = (Event) serDeser.deserialize(buffer.toByteBuffer());
+
+ zkClient.createEphemeralSequential("/test/data", event.get("message", long.class));
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
+ logger.error("Error", event.getCause());
+ Channel c = context.getChannel();
+ c.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess())
+ channels.remove(future.getChannel());
+ else
+ logger.error("FAILED to close channel");
+ }
+ });
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
|