incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [7/7] git commit: Merge branch 'S4-59' into dev - includes a few manual fixes for the merge
Date Wed, 23 Jan 2013 22:35:05 GMT
Updated Branches:
  refs/heads/dev fe9f32f35 -> 051082eb7


Merge branch 'S4-59' into dev
- includes a few manual fixes for the merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/051082eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/051082eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/051082eb

Branch: refs/heads/dev
Commit: 051082eb7603aac816ad226a64a31c3c84c24ed5
Parents: fe9f32f 1aa1e77
Author: Matthieu Morel <mmorel@apache.org>
Authored: Wed Jan 23 21:41:15 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Wed Jan 23 23:33:58 2013 +0100

----------------------------------------------------------------------
 subprojects/s4-base/s4-base.gradle                 |    2 +-
 .../org/apache/s4/base/util/ModulesLoader.java     |   16 +
 .../org/apache/s4/base/util/S4RLoaderFactory.java  |    2 +-
 .../src/main/resources/default.s4.base.properties  |    4 +
 .../java/org/apache/s4/comm/DefaultCommModule.java |   24 +--
 .../org/apache/s4/comm/ModulesLoaderFactory.java   |  112 +++++++
 .../apache/s4/comm/util/ArchiveFetchException.java |   14 +
 .../org/apache/s4/comm/util/ArchiveFetcher.java    |   42 +++
 .../s4/comm/util/FileSystemArchiveFetcher.java     |   41 +++
 .../apache/s4/comm/util/HttpArchiveFetcher.java    |  187 +++++++++++
 .../org/apache/s4/comm/util/RemoteFileFetcher.java |   23 ++
 .../src/main/resources/default.s4.comm.properties  |    4 -
 .../java/org/apache/s4/comm/tcp/TCPBasicTest.java  |   25 +-
 .../java/org/apache/s4/comm/udp/UDPBasicTest.java  |   22 +-
 .../org/apache/s4/fixtures/TestCommModule.java     |   38 +++
 .../main/java/org/apache/s4/core/BaseModule.java   |   80 +++++
 .../java/org/apache/s4/core/DefaultCoreModule.java |   15 +
 .../src/main/java/org/apache/s4/core/Main.java     |  242 ---------------
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |  235 ++++++++++++++
 .../src/main/java/org/apache/s4/core/S4Node.java   |   79 +++++
 .../src/main/java/org/apache/s4/core/Server.java   |    2 +-
 .../s4/core/ft/DefaultFileSystemStateStorage.java  |   22 +--
 .../ft/FileSystemBackendCheckpointingModule.java   |   28 ++
 .../java/org/apache/s4/core/util/AppConfig.java    |  165 ++++++++++
 .../s4/core/util/ParametersInjectionModule.java    |    3 +-
 .../java/org/apache/s4/deploy/DeploymentUtils.java |   38 +++
 .../s4/deploy/DistributedDeploymentManager.java    |   63 ++--
 .../org/apache/s4/deploy/FileSystemS4RFetcher.java |   41 ---
 .../java/org/apache/s4/deploy/HttpS4RFetcher.java  |  186 -----------
 .../main/java/org/apache/s4/deploy/S4RFetcher.java |   42 ---
 .../org/apache/s4/core/ft/CheckpointingTest.java   |    4 +
 .../org/apache/s4/core/ft/FTWordCountTest.java     |   23 +-
 ...ndWithZKStorageCallbackCheckpointingModule.java |    8 +-
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |   31 ++-
 .../core/moduleloader/ModuleLoaderTestUtils.java   |  110 +++++++
 .../s4/core/moduleloader/TestModuleLoader.java     |   13 +
 .../core/moduleloader/TestModuleLoaderRemote.java  |   25 ++
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   21 +-
 .../s4/deploy/prodcon/TestProducerConsumer.java    |   19 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   19 +-
 .../org/apache/s4/wordcount/WordClassifierPE.java  |    2 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   39 +--
 .../src/test/java/org/apache/s4/edsl/TestEDSL.java |    9 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |   97 +++++--
 .../src/main/java/org/apache/s4/tools/Status.java  |   14 +-
 .../src/main/java/org/apache/s4/tools/Tools.java   |    6 +-
 .../main/java/org/apache/s4/tools/ZKServer.java    |    4 +-
 .../src/main/resources/templates/newApp.README     |    7 +-
 .../s4-tools/src/main/resources/templates/s4       |    8 +-
 .../test/java/org/apache/s4/tools/TestDeploy.java  |    1 -
 test-apps/custom-modules/README.txt                |    1 +
 test-apps/custom-modules/build.gradle              |  116 +++++++
 .../src/main/java/org/apache/s4/TestListener.java  |  122 ++++++++
 .../java/org/apache/s4/TestListenerModule.java     |   14 +
 test-apps/twitter-adapter/build.gradle             |    2 +-
 55 files changed, 1784 insertions(+), 728 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index a295290,2fa3e0e..9c37b4d
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@@ -30,11 -29,8 +29,9 @@@ import org.apache.s4.base.Hasher
  import org.apache.s4.base.RemoteEmitter;
  import org.apache.s4.base.SerializerDeserializer;
  import org.apache.s4.comm.serialize.KryoSerDeser;
 +import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 +import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
  import org.apache.s4.comm.tcp.RemoteEmitters;
- import org.apache.s4.comm.topology.Assignment;
- import org.apache.s4.comm.topology.AssignmentFromZK;
  import org.apache.s4.comm.topology.Cluster;
  import org.apache.s4.comm.topology.ClusterFromZK;
  import org.apache.s4.comm.topology.Clusters;
@@@ -88,15 -82,13 +83,12 @@@ public class DefaultCommModule extends 
          /* The hashing function to map keys top partitions. */
          bind(Hasher.class).to(DefaultHasher.class);
          /* Use Kryo to serialize events. */
 -        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 +        // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
 +        // classloader
 +        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
 +                SerializerDeserializerFactory.class));
  
-         // a node holds a single partition assignment
-         // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
-         bind(Assignment.class).to(AssignmentFromZK.class);
 -        // // a node holds a single partition assignment
 -        // // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
 -        // bind(Assignment.class).to(AssignmentFromZK.class);
          bind(Cluster.class).to(ClusterFromZK.class);
 -        // bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
  
          bind(Clusters.class).to(ClustersFromZK.class);
  

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
index 0000000,4ee28d3..39b98d9
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
@@@ -1,0 -1,187 +1,187 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.s4.comm.util;
+ 
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileNotFoundException;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.InetSocketAddress;
+ import java.net.URI;
+ import java.util.concurrent.Executors;
+ 
+ import org.jboss.netty.bootstrap.ClientBootstrap;
+ import org.jboss.netty.buffer.ChannelBuffer;
+ import org.jboss.netty.buffer.ChannelBufferInputStream;
+ import org.jboss.netty.channel.Channel;
+ import org.jboss.netty.channel.ChannelFuture;
+ import org.jboss.netty.channel.ChannelHandlerContext;
+ import org.jboss.netty.channel.ChannelPipeline;
+ import org.jboss.netty.channel.ChannelPipelineFactory;
+ import org.jboss.netty.channel.Channels;
+ import org.jboss.netty.channel.MessageEvent;
+ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+ import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+ import org.jboss.netty.handler.codec.http.HttpChunk;
+ import org.jboss.netty.handler.codec.http.HttpClientCodec;
+ import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
+ import org.jboss.netty.handler.codec.http.HttpHeaders;
+ import org.jboss.netty.handler.codec.http.HttpMethod;
+ import org.jboss.netty.handler.codec.http.HttpRequest;
+ import org.jboss.netty.handler.codec.http.HttpResponse;
+ import org.jboss.netty.handler.codec.http.HttpVersion;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.io.ByteStreams;
+ 
+ /**
+  * <p>
+  * Fetches modules and app archives through HTTP.
+  * </p>
+  * <p>
+  * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
+  * 
+  * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
+  *      snoop example</a>
+  * 
+  *      </p>
+  */
+ public class HttpArchiveFetcher implements ArchiveFetcher {
+ 
+     private static Logger logger = LoggerFactory.getLogger(HttpArchiveFetcher.class);
+ 
+     @Override
+     public InputStream fetch(URI uri) throws ArchiveFetchException {
+         logger.debug("Fetching file through http: {}", uri.toString());
+ 
+         String host = uri.getHost();
+         int port = uri.getPort();
+         if (port == -1) {
+             if (uri.getScheme().equalsIgnoreCase("http")) {
+                 port = 80;
+             } else if (uri.getScheme().equalsIgnoreCase("https")) {
+                 port = 443;
+             }
+         }
+ 
+         ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
+                 Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+         File tmpFile;
+         try {
+             tmpFile = File.createTempFile("http", "download");
+         } catch (IOException e) {
+             throw new ArchiveFetchException("Cannot create temporary file for fetching archive data from http server",
+                     e);
+         }
+         clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
+         ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
+         // TODO timeout?
+         Channel channel = channelFuture.awaitUninterruptibly().getChannel();
+         if (!channelFuture.isSuccess()) {
+             clientBootstrap.releaseExternalResources();
+             throw new ArchiveFetchException("Cannot connect to http uri [" + uri.toString() + "]",
+                     channelFuture.getCause());
+         }
+ 
 -        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
++        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath());
+         request.setHeader(HttpHeaders.Names.HOST, host);
+         request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+         request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+ 
+         channel.write(request);
+ 
+         channel.getCloseFuture().awaitUninterruptibly();
+ 
+         clientBootstrap.releaseExternalResources();
+ 
+         logger.debug("Finished downloading archive file through http {}, as file: {}", uri.toString(),
+                 tmpFile.getAbsolutePath());
+         try {
+             return new FileInputStream(tmpFile);
+         } catch (FileNotFoundException e) {
+             throw new ArchiveFetchException("Cannot get input stream from temporary file with s4r data ["
+                     + tmpFile.getAbsolutePath() + "]");
+         }
+     }
+ 
+     private class HttpClientPipelineFactory implements ChannelPipelineFactory {
+ 
+         File tmpFile;
+ 
+         public HttpClientPipelineFactory(File tmpFile) {
+             this.tmpFile = tmpFile;
+         }
+ 
+         @Override
+         public ChannelPipeline getPipeline() throws Exception {
+             // Create a default pipeline implementation.
+             ChannelPipeline pipeline = Channels.pipeline();
+ 
+             pipeline.addLast("codec", new HttpClientCodec());
+ 
+             // Remove the following line if you don't want automatic content decompression.
+             pipeline.addLast("inflater", new HttpContentDecompressor());
+ 
+             pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
+             return pipeline;
+         }
+     }
+ 
+     // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
+     private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+ 
+         private boolean readingChunks;
+         FileOutputStream fos;
+ 
+         public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
+             this.fos = new FileOutputStream(tmpFile);
+         }
+ 
+         @Override
+         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+             if (!readingChunks) {
+                 HttpResponse response = (HttpResponse) e.getMessage();
+ 
+                 if (response.isChunked()) {
+                     readingChunks = true;
+                 } else {
+                     copyContentToTmpFile(response.getContent());
+                 }
+             } else {
+                 HttpChunk chunk = (HttpChunk) e.getMessage();
+                 if (chunk.isLast()) {
+                     readingChunks = false;
+                     fos.close();
+                 } else {
+                     copyContentToTmpFile(chunk.getContent());
+                 }
+             }
+ 
+         }
+ 
+         private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
+             ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
+             ByteStreams.copy(cbis, fos);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index fad7ec4,bef0894..49ad568
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@@ -1,33 -1,5 +1,29 @@@
  s4.comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
  s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
  s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener
 +
  # I/O channel connection timeout, when applicable (e.g. used by netty)
  s4.comm.timeout=1000
- s4.cluster.zk_address = localhost:2181
- s4.cluster.zk_session_timeout = 10000
- s4.cluster.zk_connection_timeout = 10000
- 
 +
 +# NOTE: the following numbers should be tuned according to the application, use case, and infrastructure
 +
 +# how many threads to use for the sender stage (i.e. serialization)
 +s4.sender.parallelism=1
 +# maximum number of events in the buffer of the sender stage
 +s4.sender.workQueueSize=10000
 +# maximum sending rate from a given node, in events / s (used with throttling sender executors)
 +s4.sender.maxRate=200000
 +
 +# how many threads to use for the *remote* sender stage (i.e. serialization)
 +s4.remoteSender.parallelism=1
 +# maximum number of events in the buffer of the *remote* sender stage
 +s4.remoteSender.workQueueSize=10000
 +# maximum *remote* sending rate from a given node, in events / s (used with throttling *remote* sender executors)
 +s4.remoteSender.maxRate=200000
 +
 +# maximum number of pending writes to a given comm channel
 +s4.emitter.maxPendingWrites=1000
 +
 +# maximum number of events in the buffer of the processing stage
 +s4.stream.workQueueSize=10000
 +

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
index a4bd8a2,0000000..e400813
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@@ -1,45 -1,0 +1,54 @@@
 +package org.apache.s4.comm.tcp;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import junit.framework.Assert;
 +
 +import org.apache.s4.base.Emitter;
 +import org.apache.s4.base.Listener;
 +import org.apache.s4.base.SerializerDeserializer;
- import org.apache.s4.comm.DefaultCommModule;
 +import org.apache.s4.fixtures.CommTestUtils;
 +import org.apache.s4.fixtures.MockReceiverModule;
 +import org.apache.s4.fixtures.NoOpReceiverModule;
 +import org.apache.s4.fixtures.TCPTransportModule;
++import org.apache.s4.fixtures.TestCommModule;
 +import org.apache.s4.fixtures.ZkBasedTest;
 +import org.junit.Test;
 +
 +import com.google.common.io.Resources;
 +import com.google.inject.Guice;
 +import com.google.inject.Injector;
++import com.google.inject.util.Modules;
 +
 +public class TCPBasicTest extends ZkBasedTest {
 +
++    public TCPBasicTest() {
++        super(2);
++    }
++
 +    @Test
 +    public void testSingleMessage() throws Exception {
 +
-         Injector injector1 = Guice.createInjector(
-                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                 new TCPTransportModule(), new NoOpReceiverModule());
++        Injector injector1 = Guice.createInjector(Modules.override(
++                new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++                new TCPTransportModule(), new NoOpReceiverModule()));
++
++        // this node picks partition 0
 +        Emitter emitter = injector1.getInstance(Emitter.class);
 +
-         Injector injector2 = Guice.createInjector(
-                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                 new TCPTransportModule(), new MockReceiverModule());
++        Injector injector2 = Guice.createInjector(Modules.override(
++                new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++                new TCPTransportModule(), new MockReceiverModule()));
++
 +        // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
 +        // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
 +        injector2.getInstance(Listener.class);
 +
-         emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
++        // send to the other node
++        emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
 +
 +        // check receiver got the message
 +        Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
index 47cb39c,0000000..b06734b
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@@ -1,42 -1,0 +1,52 @@@
 +package org.apache.s4.comm.udp;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import junit.framework.Assert;
 +
 +import org.apache.s4.base.Emitter;
 +import org.apache.s4.base.Listener;
 +import org.apache.s4.base.SerializerDeserializer;
- import org.apache.s4.comm.DefaultCommModule;
 +import org.apache.s4.fixtures.CommTestUtils;
 +import org.apache.s4.fixtures.MockReceiverModule;
 +import org.apache.s4.fixtures.NoOpReceiverModule;
++import org.apache.s4.fixtures.TestCommModule;
 +import org.apache.s4.fixtures.UDPTransportModule;
 +import org.apache.s4.fixtures.ZkBasedTest;
 +import org.junit.Test;
 +
 +import com.google.common.io.Resources;
 +import com.google.inject.Guice;
 +import com.google.inject.Injector;
++import com.google.inject.util.Modules;
 +
 +public class UDPBasicTest extends ZkBasedTest {
 +
++    public UDPBasicTest() {
++        super(2);
++    }
++
 +    @Test
 +    public void testSingleMessage() throws Exception {
 +
-         Injector injector1 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
-                 .openStream(), "cluster1"), new UDPTransportModule(), new NoOpReceiverModule());
++        Injector injector1 = Guice.createInjector(Modules.override(
++                new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++                new UDPTransportModule(), new NoOpReceiverModule()));
++        // this picks partition 0
 +        Emitter emitter = injector1.getInstance(Emitter.class);
 +
-         Injector injector2 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
-                 .openStream(), "cluster1"), new UDPTransportModule(), new MockReceiverModule());
++        Injector injector2 = Guice.createInjector(Modules.override(
++                new TestCommModule(Resources.getResource("default.s4.comm.properties").openStream())).with(
++                new UDPTransportModule(), new MockReceiverModule()));
++
 +        // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
 +        // listener)
 +        injector2.getInstance(Listener.class);
 +
-         emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
++        // send to the other partition (1)
++        emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
 +
 +        Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 56d2f35,1f07bf6..7789b9f
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@@ -84,15 -87,19 +88,26 @@@ public class DefaultCoreModule extends 
          // For enabling checkpointing, one needs to use a custom module, such as
          // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
          bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
 +
 +        // shed load in local sender only by default
 +        bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
 +        bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
 +
 +        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
 +
      }
  
+     @Provides
+     @Named("s4.tmp.dir")
+     public File provideTmpDir() {
+         File tmpS4Dir = Files.createTempDir();
+         tmpS4Dir.deleteOnExit();
+         logger.warn(
+                 "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+                 tmpS4Dir.getAbsolutePath());
+         return tmpS4Dir;
+     }
+ 
      private void loadProperties(Binder binder) {
          try {
              config = new PropertiesConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 0000000,e469121..77dbc67
mode 000000,100644..100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@@ -1,0 -1,228 +1,235 @@@
+ package org.apache.s4.core;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.I0Itec.zkclient.IZkDataListener;
+ import org.I0Itec.zkclient.serialize.ZkSerializer;
+ import org.apache.s4.base.util.ModulesLoader;
+ import org.apache.s4.comm.DefaultCommModule;
+ import org.apache.s4.comm.ModulesLoaderFactory;
+ import org.apache.s4.comm.topology.ZNRecord;
+ import org.apache.s4.comm.topology.ZNRecordSerializer;
+ import org.apache.s4.comm.topology.ZkClient;
+ import org.apache.s4.comm.util.ArchiveFetchException;
+ import org.apache.s4.comm.util.ArchiveFetcher;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.core.util.ParametersInjectionModule;
+ import org.apache.zookeeper.CreateMode;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.base.Strings;
+ import com.google.common.io.ByteStreams;
+ import com.google.common.io.Files;
+ import com.google.common.io.Resources;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.Inject;
+ import com.google.inject.Injector;
+ import com.google.inject.Module;
+ import com.google.inject.name.Named;
+ import com.google.inject.util.Modules;
+ import com.google.inject.util.Modules.OverriddenModuleBuilder;
+ 
+ /**
+  * This is the bootstrap for S4 nodes.
+  * <p>
+  * Its roles are to:
+  * <ul>
+  * <li>register within the S4 cluster (and acquire a partition).
+  * <li>look for application deployed on the S4 cluster
+  * </ul>
+  * <p>
+  * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
+  * application code is then downloaded and the app started.
+  * <p>
+  * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
+  * application classes are available in the classpath.
+  * 
+  * 
+  * 
+  */
+ public class S4Bootstrap {
+     private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
+ 
+     private final ZkClient zkClient;
+     private final String appPath;
+     private final AtomicBoolean deployed = new AtomicBoolean(false);
+ 
+     private final ArchiveFetcher fetcher;
+ 
+     private Injector parentInjector;
+ 
+     CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+ 
+     @Inject
+     public S4Bootstrap(@Named("s4.cluster.name") String clusterName,
+             @Named("s4.cluster.zk_address") String zookeeperAddress,
+             @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+             @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ArchiveFetcher fetcher) {
+ 
+         this.fetcher = fetcher;
+         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+         ZkSerializer serializer = new ZNRecordSerializer();
+         zkClient.setZkSerializer(serializer);
+ 
+         String appDir = "/s4/clusters/" + clusterName + "/app";
+         if (!zkClient.exists(appDir)) {
+             zkClient.create(appDir, null, CreateMode.PERSISTENT);
+         }
+         appPath = appDir + "/s4App";
+         zkClient.subscribeDataChanges(appPath, new AppChangeListener());
+     }
+ 
+     public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException {
+         this.parentInjector = parentInjector;
+         if (zkClient.exists(appPath)) {
+             if (!deployed.get()) {
+                 loadModulesAndStartNode(parentInjector);
+             }
+         }
+ 
+         signalOneAppLoaded.await();
+     }
+ 
+     private void loadModulesAndStartNode(final Injector parentInjector) throws ArchiveFetchException {
+ 
+         final ZNRecord appData = zkClient.readData(appPath);
+         // can be null
+         final AppConfig appConfig = new AppConfig(appData);
+ 
+         String appName = appConfig.getAppName();
+ 
+         List<File> modulesLocalCopies = new ArrayList<File>();
+ 
+         for (String uriString : appConfig.getCustomModulesURIs()) {
+             modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString));
+         }
+         final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+ 
+         Thread t = new Thread(new Runnable() {
+ 
+             @Override
+             public void run() {
+                 // load app class through modules classloader and start it
+                 S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+                 signalOneAppLoaded.countDown();
+             }
+         }, "S4 platform loader");
+         t.start();
+ 
+     }
+ 
+     private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
+ 
+         URI uri;
+         try {
+             uri = new URI(uriString);
+         } catch (URISyntaxException e2) {
+             throw new ArchiveFetchException("Invalid module URI : [" + uriString + "]", e2);
+         }
+         File localModuleFileCopy;
+         try {
+             localModuleFileCopy = File.createTempFile("tmp", "module");
+         } catch (IOException e1) {
+             logger.error(
+                     "Cannot deploy app [{}] because a local copy of the module file could not be initialized due to [{}]",
+                     appName, e1.getClass().getName() + "->" + e1.getMessage());
+             throw new ArchiveFetchException("Cannot deploy application [" + appName + "]", e1);
+         }
+         localModuleFileCopy.deleteOnExit();
+         try {
+             if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localModuleFileCopy)) == 0) {
+                 throw new ArchiveFetchException("Cannot copy archive from [" + uri.toString() + "] to ["
+                         + localModuleFileCopy.getAbsolutePath() + "] (nothing was copied)");
+             }
+         } catch (Exception e) {
+             throw new ArchiveFetchException("Cannot deploy application [" + appName + "] from URI [" + uri.toString()
+                     + "] ", e);
+         }
+         return localModuleFileCopy;
+     }
+ 
+     public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+         try {
+             Injector injector;
+             InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+             InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+ 
+             logger.info("Initializing S4 app with : {}", appConfig.toString());
+ 
+             AbstractModule commModule = new DefaultCommModule(commConfigFileInputStream);
+             AbstractModule coreModule = new DefaultCoreModule(coreConfigFileInputStream);
+ 
+             List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+             for (String moduleClass : appConfig.getCustomModulesNames()) {
+                 extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+             }
+             Module combinedModule = Modules.combine(commModule, coreModule);
+             if (extraModules.size() > 0) {
+                 OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+                 combinedModule = overridenModuleBuilder.with(extraModules);
+             }
+ 
+             if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+ 
+                 logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+                 Map<String, String> namedParameters = new HashMap<String, String>();
+ 
+                 namedParameters.putAll(appConfig.getNamedParameters());
+                 combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+             }
+ 
 -            injector = parentInjector.createChildInjector(combinedModule);
 -
+             if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
++                // In that case we won't be using an S4R classloader, app classes are available from the current
++                // classloader
++                // The app module provides bindings specific to the app class loader, in this case the current thread's
++                // class loader.
++                AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
++                // NOTE: because the app module can be overriden
++                combinedModule = Modules.override(appModule).with(combinedModule);
++                injector = parentInjector.createChildInjector(combinedModule);
+                 logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+                 App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+                 app.init();
+                 app.start();
+             } else {
++                injector = parentInjector.createChildInjector(combinedModule);
+                 if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
+                     logger.info("S4 node in standby until app class or app URI is specified");
+                 }
+                 Server server = injector.getInstance(Server.class);
+                 server.start(injector);
+             }
+         } catch (Exception e) {
+             logger.error("Cannot start S4 node", e);
+             System.exit(1);
+         }
+     }
+ 
+     class AppChangeListener implements IZkDataListener {
+ 
+         @Override
+         public void handleDataChange(String dataPath, Object data) throws Exception {
+             if (!deployed.get()) {
+                 loadModulesAndStartNode(parentInjector);
+                 deployed.set(true);
+             }
+ 
+         }
+ 
+         @Override
+         public void handleDataDeleted(String dataPath) throws Exception {
+             logger.error("Application undeployment is not supported yet");
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
index 3e2d617,5178c97..4ce4693
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@@ -29,6 -39,23 +39,24 @@@ public class FileSystemBackendCheckpoin
      protected void configure() {
          bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
          bind(CheckpointingFramework.class).to(SafeKeeper.class);
 +
      }
+ 
+     @Provides
+     @Named("s4.checkpointing.filesystem.storageRootPath")
+     public String provideStorageRootPath() {
+         File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+                 + "storage");
+         String storageRootPath = defaultStorageDir.getAbsolutePath();
+         logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+         if (!defaultStorageDir.exists()) {
+             if (!(defaultStorageDir.mkdirs())) {
+                 logger.error("Storage directory not specified, and cannot create default storage directory : "
+                         + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+             }
+         }
+         return storageRootPath;
+ 
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 2b3f8c7,8d3d324..50846b4
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@@ -25,8 -25,11 +25,10 @@@ import java.util.concurrent.TimeUnit
  import junit.framework.Assert;
  
  import org.apache.s4.base.Event;
 -import org.apache.s4.base.EventMessage;
 -import org.apache.s4.base.SerializerDeserializer;
 +import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
  import org.apache.s4.comm.tcp.TCPEmitter;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
  import org.apache.s4.fixtures.CommTestUtils;
  import org.apache.s4.fixtures.CoreTestUtils;
  import org.apache.s4.fixtures.ZkBasedTest;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index da2bf5d,2f4f742..4dc9484
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@@ -27,9 -27,13 +27,12 @@@ import junit.framework.Assert
  
  import org.I0Itec.zkclient.IZkChildListener;
  import org.apache.s4.base.Event;
 -import org.apache.s4.base.EventMessage;
 -import org.apache.s4.base.SerializerDeserializer;
 +import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
  import org.apache.s4.comm.tcp.TCPEmitter;
  import org.apache.s4.comm.topology.ZkClient;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
+ import org.apache.s4.fixtures.CommTestUtils;
  import org.apache.s4.fixtures.CoreTestUtils;
  import org.apache.s4.fixtures.ZkBasedTest;
  import org.apache.zookeeper.KeeperException;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index 0000000,3b95334..e5d0d01
mode 000000,100644..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@@ -1,0 -1,109 +1,110 @@@
+ package org.apache.s4.core.moduleloader;
+ 
+ import java.io.File;
+ import java.util.List;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ 
+ import junit.framework.Assert;
+ 
+ import org.apache.s4.base.Emitter;
+ import org.apache.s4.base.Event;
 -import org.apache.s4.base.EventMessage;
+ import org.apache.s4.base.SerializerDeserializer;
+ import org.apache.s4.comm.DefaultCommModule;
++import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+ import org.apache.s4.comm.tcp.TCPEmitter;
+ import org.apache.s4.comm.topology.ZkClient;
+ import org.apache.s4.core.BaseModule;
+ import org.apache.s4.core.S4Node;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
+ import org.apache.s4.fixtures.CommTestUtils;
+ import org.apache.s4.fixtures.CoreTestUtils;
+ import org.apache.s4.wordcount.WordCountApp;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.ZooKeeper;
+ 
+ import com.beust.jcommander.internal.Lists;
+ import com.google.common.base.Strings;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.io.Resources;
+ import com.google.inject.Guice;
+ import com.google.inject.Injector;
+ 
+ public class ModuleLoaderTestUtils {
+ 
+     private static final int NB_MESSAGES = 10;
+ 
+     public static Process testModuleLoader(boolean fork) throws Exception {
+ 
+         Process forkedS4Node = null;
+         // build custom-modules.jar
+         File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+ 
+         CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+                 + "/test-apps/custom-modules/build.gradle"), "clean", new String[0]);
+ 
+         File modulesJarFile = new File(gradlewFile.getParentFile().getAbsolutePath()
+                 + "/test-apps/custom-modules/build/libs/app/custom-modules.jar");
+ 
+         Assert.assertFalse(modulesJarFile.exists());
+ 
+         CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+                 + "/test-apps/custom-modules/build.gradle"), "jar", new String[0]);
+ 
+         // make sure it is created
+         Assert.assertTrue(modulesJarFile.exists());
+ 
+         // pass it as a configuration
+         DeploymentUtils.initAppConfig(
+                 new AppConfig.Builder().appClassName(WordCountApp.class.getName())
+                         .customModulesURIs(ImmutableList.of(modulesJarFile.toURI().toString()))
+                         .customModulesNames(ImmutableList.of("org.apache.s4.TestListenerModule")).build(), "cluster1",
+                 true, "localhost:2181");
+         if (fork) {
+             forkedS4Node = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1" });
+         } else {
+             S4Node.main(new String[] { "-c", "cluster1" });
+         }
+ 
+         Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
+                 .openStream(), "cluster1"), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                 .openStream()));
+ 
+         Emitter emitter = injector.getInstance(TCPEmitter.class);
+         List<Long> messages = Lists.newArrayList();
+         for (int i = 0; i < NB_MESSAGES; i++) {
+             messages.add(System.currentTimeMillis());
+         }
+ 
+         ZkClient zkClient = new ZkClient("localhost:2181");
+         zkClient.create("/test", 0, CreateMode.PERSISTENT);
+ 
+         final ZooKeeper zk = CommTestUtils.createZkClient();
+         final CountDownLatch signalMessagesReceived = new CountDownLatch(1);
+ 
+         // watch for last message in test data sequence
+         CoreTestUtils.watchAndSignalCreation("/test/data" + Strings.padStart(String.valueOf(NB_MESSAGES - 1), 10, '0'),
+                 signalMessagesReceived, zk);
+ 
++        SerializerDeserializer serDeser = injector.getInstance(SerializerDeserializerFactory.class)
++                .createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+         for (Long message : messages) {
+             Event event = new Event();
+             event.put("message", long.class, message);
 -            emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
 -                    .serialize(event)));
++            event.setStreamId("inputStream");
++            emitter.send(0, serDeser.serialize(event));
+         }
+ 
+         // check sequential nodes in zk with correct data
+         Assert.assertTrue(signalMessagesReceived.await(10, TimeUnit.SECONDS));
+         List<String> children = zkClient.getChildren("/test");
+         for (String child : children) {
+             Long data = zkClient.readData("/test/" + child);
+             Assert.assertTrue(messages.contains(data));
+         }
+ 
+         return forkedS4Node;
+ 
+     }
 -
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
index 0000000,a1ee016..da1f62b
mode 000000,100644..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/TestModuleLoader.java
@@@ -1,0 -1,18 +1,13 @@@
+ package org.apache.s4.core.moduleloader;
+ 
+ import org.apache.s4.fixtures.ZkBasedTest;
+ import org.junit.Test;
+ 
+ public class TestModuleLoader extends ZkBasedTest {
+ 
 -    public TestModuleLoader() {
 -        // need 2 partitions: 1 for the test emitter, 1 for the S4 node
 -        super(2);
 -    }
 -
+     @Test
+     public void testLocal() throws Exception {
+         ModuleLoaderTestUtils.testModuleLoader(false);
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 3549e25,d2e2c6a..8b5329b
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@@ -30,17 -34,17 +30,16 @@@ import org.I0Itec.zkclient.IZkChildList
  import org.I0Itec.zkclient.ZkClient;
  import org.apache.commons.configuration.ConfigurationException;
  import org.apache.s4.base.Event;
 -import org.apache.s4.base.EventMessage;
 -import org.apache.s4.base.SerializerDeserializer;
 +import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
  import org.apache.s4.comm.tcp.TCPEmitter;
- import org.apache.s4.comm.topology.ZNRecord;
  import org.apache.s4.comm.topology.ZNRecordSerializer;
+ import org.apache.s4.core.util.AppConfig;
  import org.apache.s4.fixtures.CommTestUtils;
  import org.apache.s4.fixtures.CoreTestUtils;
 +import org.apache.s4.fixtures.S4RHttpServer;
  import org.apache.s4.fixtures.ZkBasedTest;
- import org.apache.zookeeper.CreateMode;
  import org.apache.zookeeper.KeeperException;
  import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 -import org.jboss.netty.handler.codec.http.HttpHeaders;
  import org.junit.After;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -105,12 -112,11 +104,11 @@@ public class TestAutomaticDeployment ex
          CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
                  CommTestUtils.createZkClient());
  
 -        DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uri).build(), "cluster1", true, "localhost:2181");
 -
 -        // ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
 -        // record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
 -        // zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
 +        if (createZkAppNode) {
 +            // otherwise we need to do that through a separate tool
-             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-             record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-             zkClient.create("/s4/clusters/cluster1/app/s4App", record, CreateMode.PERSISTENT);
++            DeploymentUtils.initAppConfig(new AppConfig.Builder().appURI(uri).build(), "cluster1", true,
++                    "localhost:2181");
 +        }
  
          Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
          Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@@ -193,33 -204,16 +192,23 @@@
  
      }
  
 -    // @Before
 -    // public void clean() throws Exception {
 -    // final ZooKeeper zk = CommTestUtils.createZkClient();
 -    // try {
 -    // zk.delete("/simpleAppCreated", -1);
 -    // } catch (Exception ignored) {
 -    // }
 -    //
 -    // zk.close();
 -    // }
 +    public static void checkNoAppAlreadyDeployed(ZkClient zkClient) {
 +        List<String> processes = zkClient.getChildren("/s4/clusters/cluster1/process");
 +        Assert.assertTrue(processes.size() == 0);
 +        final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 +
 +        zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
 +
 +            @Override
 +            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
 +                if (currentChilds.size() == 2) {
 +                    signalProcessesReady.countDown();
 +                }
 +
 +            }
 +        });
-     }
 +
-     // @Before
-     // public void clean() throws Exception {
-     // final ZooKeeper zk = CommTestUtils.createZkClient();
-     // try {
-     // zk.delete("/simpleAppCreated", -1);
-     // } catch (Exception ignored) {
-     // }
-     //
-     // zk.close();
-     // }
++    }
  
      @After
      public void cleanup() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 8ee58e5,f322256..5b4f596
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@@ -25,12 -25,15 +25,14 @@@ import java.util.concurrent.TimeUnit
  import junit.framework.Assert;
  
  import org.apache.s4.base.Event;
 -import org.apache.s4.base.EventMessage;
 -import org.apache.s4.base.SerializerDeserializer;
  import org.apache.s4.comm.DefaultCommModule;
 +import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
  import org.apache.s4.comm.tcp.TCPEmitter;
- import org.apache.s4.core.AppModule;
+ import org.apache.s4.core.BaseModule;
  import org.apache.s4.core.DefaultCoreModule;
- import org.apache.s4.core.Main;
+ import org.apache.s4.core.S4Node;
+ import org.apache.s4.core.util.AppConfig;
+ import org.apache.s4.deploy.DeploymentUtils;
  import org.apache.s4.fixtures.CommTestUtils;
  import org.apache.s4.fixtures.ZkBasedTest;
  import org.apache.zookeeper.CreateMode;
@@@ -111,14 -107,13 +106,16 @@@ public class WordCountTest extends ZkBa
          Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
      }
  
 -    public void injectSentence(String sentence) throws IOException {
 +    public void injectSentence(String sentence) throws IOException, InterruptedException {
          Event event = new Event();
 +        event.setStreamId("inputStream");
          event.put("sentence", String.class, sentence);
+ 
 -        // NOTE: we send to partition 1 since partition 0 hosts the emitter
 -        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
 -                .serialize(event)));
++        // NOTE: we send to partition 0 since partition 1 hosts the emitter
 +        emitter.send(
 +                0,
 +                injector.getInstance(SerializerDeserializerFactory.class)
-                         .createSerializerDeserializer(getClass().getClassLoader()).serialize(event));
++                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
      }
  
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index ac8e214,69ebaa6..76a1d43
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@@ -67,18 -70,23 +71,21 @@@ public class Deploy extends S4ArgsBase 
                  System.exit(1);
              }
  
-             URI s4rURI;
 -            File s4rToDeploy = null;
++            URI s4rURI = null;
  
              if (deployArgs.s4rPath != null) {
 -                // 1. if there is an application archive, we use it
 -                s4rToDeploy = new File(deployArgs.s4rPath);
 -                if (!s4rToDeploy.exists()) {
 -                    logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
 -                    System.exit(1);
 -                } else {
 -                    logger.info(
 -                            "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
 -                            s4rToDeploy.getAbsolutePath());
 +                s4rURI = new URI(deployArgs.s4rPath);
-                 // if (!s4rToDeploy.exists()) {
-                 // logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
-                 // System.exit(1);
-                 // } else {
++                if (Strings.isNullOrEmpty(s4rURI.getScheme())) {
++                    // default is file
++                    s4rURI = new File(deployArgs.s4rPath).toURI();
+                 }
 +                logger.info(
 +                        "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
 +                        s4rURI.toString());
-             } else {
+             } else if (deployArgs.gradleBuildFile != null) {
+ 
+                 // 2. otherwise if there is a build file, we create the S4R archive from that
+ 
                  List<String> params = new ArrayList<String>();
                  // prepare gradle -P parameters, including passed gradle opts
                  params.addAll(deployArgs.gradleOpts);
@@@ -97,31 -105,29 +104,31 @@@
                          System.exit(1);
                      }
                  } else {
 -                    s4rToDeploy = tmpS4R;
 +                    s4rURI = tmpS4R.toURI();
                  }
-             }
+             } else {
+                 if (!Strings.isNullOrEmpty(deployArgs.appClass)) {
+                     // 3. otherwise if there is at least an app class specified (e.g. for running "s4 adapter"), we use
+                     // it and won't use an S4R
+                     logger.info("No S4R path specified, nor build file specified: this assumes the app is in the classpath");
+                 } else {
+                     logger.error("You must specify an S4R file, a build file to create an S4R from, or an appClass that will be in the classpath");
+                     System.exit(1);
+                 }
  
-             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-             record.putSimpleField(DistributedDeploymentManager.S4R_URI, s4rURI.toString());
-             record.putSimpleField("name", deployArgs.appName);
-             String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
-             if (zkClient.exists(deployedAppPath)) {
-                 ZNRecord readData = zkClient.readData(deployedAppPath);
-                 logger.error("Cannot deploy app [{}], because app [{}] is already deployed", deployArgs.appName,
-                         readData.getSimpleField("name"));
-                 System.exit(1);
              }
  
-             zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/app/s4App", record, CreateMode.PERSISTENT);
-             logger.info(
-                     "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
-                     new String[] { deployArgs.appName, deployArgs.clusterName,
-                             "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName, s4rURI.toString() });
- 
+             DeploymentUtils.initAppConfig(
+                     new AppConfig.Builder().appName(deployArgs.appName)
 -                            .appURI(s4rToDeploy != null ? s4rToDeploy.toURI().toString() : null)
 -                            .appClassName(deployArgs.appClass).customModulesNames(deployArgs.modulesClassesNames)
 -                            .customModulesURIs(deployArgs.modulesURIs)
++                            .appURI(s4rURI == null ? null : s4rURI.toString())
++                            .customModulesNames(deployArgs.modulesClassesNames)
++                            .customModulesURIs(deployArgs.modulesURIs).appClassName(deployArgs.appClass)
+                             .namedParameters(convertListArgsToMap(deployArgs.extraNamedParameters)).build(),
+                     deployArgs.clusterName, false, deployArgs.zkConnectionString);
              // Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
 -            System.exit(0);
 +            if (!deployArgs.testMode) {
 +                System.exit(0);
 +            }
          } catch (Exception e) {
              LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
          }
@@@ -155,9 -173,33 +174,35 @@@
          @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
          int timeout = 10000;
  
+         @Parameter(names = { "-modulesURIs", "-mu" }, description = "URIs for fetching code of custom modules")
+         List<String> modulesURIs = new ArrayList<String>();
+ 
+         @Parameter(names = { "-modulesClasses", "-emc", "-mc" }, description = "Fully qualified class names of custom modules")
+         List<String> modulesClassesNames = new ArrayList<String>();
+ 
+         @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+         List<String> extraNamedParameters = new ArrayList<String>();
+ 
 +        @Parameter(names = "-testMode", description = "Special mode for regression testing", hidden = true)
 +        boolean testMode = false;
+     }
  
+     /**
+      * Parameters parsing utility.
+      * 
+      */
+     public static class InlineConfigParameterConverter implements IStringConverter<String> {
+ 
+         @Override
+         public String convert(String arg) {
+             Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
+             logger.info("processing inline configuration parameter {}", arg);
+             Matcher parameterMatcher = parameterPattern.matcher(arg);
+             if (!parameterMatcher.find()) {
+                 throw new IllegalArgumentException("Cannot understand parameter " + arg);
+             }
+             return parameterMatcher.group(1);
+         }
      }
  
      static class ExecGradle {
@@@ -198,7 -240,7 +243,5 @@@
                  connection.close();
              }
          }
--
      }
--
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
----------------------------------------------------------------------
diff --cc subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
index b9f3b5d,0000000..cff7359
mode 100644,000000..100644
--- a/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
+++ b/subprojects/s4-tools/src/test/java/org/apache/s4/tools/TestDeploy.java
@@@ -1,109 -1,0 +1,108 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.s4.tools;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +
 +import junit.framework.Assert;
 +
 +import org.I0Itec.zkclient.IZkChildListener;
 +import org.I0Itec.zkclient.ZkClient;
 +import org.apache.s4.comm.topology.ZNRecordSerializer;
 +import org.apache.s4.deploy.AppConstants;
 +import org.apache.s4.deploy.TestAutomaticDeployment;
 +import org.apache.s4.fixtures.CommTestUtils;
 +import org.apache.s4.fixtures.CoreTestUtils;
 +import org.apache.s4.fixtures.S4RHttpServer;
 +import org.apache.s4.fixtures.ZkBasedTest;
 +import org.junit.After;
 +import org.junit.Test;
 +
 +import com.google.common.io.ByteStreams;
 +import com.google.common.io.Files;
 +
 +public class TestDeploy extends ZkBasedTest {
 +
 +    private Process forkedNode;
 +    private S4RHttpServer s4rHttpServer;
 +
 +    @Test
 +    public void testDeployment() throws Exception {
 +        TestAutomaticDeployment.createS4RFiles();
 +
 +        ZkClient zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
 +        zkClient.setZkSerializer(new ZNRecordSerializer());
 +
 +        final CountDownLatch signalNodeReady = new CountDownLatch(1);
 +
 +        zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
 +
 +            @Override
 +            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
 +                if (currentChilds.size() == 1) {
 +                    signalNodeReady.countDown();
 +                }
 +
 +            }
 +        });
 +
 +        TestAutomaticDeployment.checkNoAppAlreadyDeployed(zkClient);
 +
 +        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
 +
 +        Assert.assertTrue(signalNodeReady.await(10, TimeUnit.SECONDS));
 +
 +        // deploy app
 +
 +        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
 +
 +        File tmpDir = Files.createTempDir();
 +
 +        File s4rToDeploy = new File(tmpDir, String.valueOf(System.currentTimeMillis()));
 +
 +        Assert.assertTrue(ByteStreams.copy(
 +                Files.newInputStreamSupplier(new File(TestAutomaticDeployment.tmpAppsDir.getAbsolutePath()
 +                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
 +
 +        s4rHttpServer = new S4RHttpServer(8080, tmpDir);
 +        s4rHttpServer.start();
 +
-         // Deploy
 +        Deploy.main(new String[] { "-s4r", "http://localhost:8080/s4/" + s4rToDeploy.getName(), "-c", "cluster1",
 +                "-appName", "toto", "-testMode" });
 +
 +        TestAutomaticDeployment.assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName(), zkClient, false);
 +
 +        // check resource loading (we use a zkclient without custom serializer)
 +        ZkClient client2 = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
 +        Assert.assertEquals("Salut!", client2.readData("/resourceData"));
 +
 +    }
 +
 +    @After
 +    public void cleanup() throws IOException, InterruptedException {
 +        CoreTestUtils.killS4App(forkedNode);
 +        if (s4rHttpServer != null) {
 +            s4rHttpServer.stop();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
----------------------------------------------------------------------
diff --cc test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
index 0000000,3451a28..c3916f0
mode 000000,100644..100644
--- a/test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
+++ b/test-apps/custom-modules/src/main/java/org/apache/s4/TestListener.java
@@@ -1,0 -1,136 +1,122 @@@
+ package org.apache.s4;
+ 
+ import java.net.InetSocketAddress;
+ import java.util.concurrent.Executors;
+ 
+ import org.apache.commons.lang.NotImplementedException;
+ import org.apache.s4.base.Event;
 -import org.apache.s4.base.EventMessage;
+ import org.apache.s4.base.Listener;
+ import org.apache.s4.base.SerializerDeserializer;
+ import org.apache.s4.comm.topology.Assignment;
+ import org.apache.s4.comm.topology.ClusterNode;
+ import org.apache.s4.comm.topology.ZkClient;
+ import org.jboss.netty.bootstrap.ServerBootstrap;
+ import org.jboss.netty.buffer.ChannelBuffer;
+ import org.jboss.netty.channel.Channel;
+ import org.jboss.netty.channel.ChannelFactory;
+ import org.jboss.netty.channel.ChannelFuture;
+ import org.jboss.netty.channel.ChannelFutureListener;
+ import org.jboss.netty.channel.ChannelHandlerContext;
+ import org.jboss.netty.channel.ChannelPipeline;
+ import org.jboss.netty.channel.ChannelPipelineFactory;
+ import org.jboss.netty.channel.Channels;
+ import org.jboss.netty.channel.ExceptionEvent;
+ import org.jboss.netty.channel.MessageEvent;
+ import org.jboss.netty.channel.SimpleChannelHandler;
+ import org.jboss.netty.channel.group.ChannelGroup;
+ import org.jboss.netty.channel.group.DefaultChannelGroup;
+ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+ import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.inject.Inject;
+ import com.google.inject.name.Named;
+ 
+ public class TestListener implements Listener {
+ 
+     private static Logger logger = LoggerFactory.getLogger(TestListener.class);
+ 
+     private ServerBootstrap bootstrap;
+     private final ChannelGroup channels = new DefaultChannelGroup();
+     private ZkClient zkClient;
+     private SerializerDeserializer serDeser;
+ 
+     @Inject
+     public TestListener(Assignment assignment, @Named("s4.comm.timeout") int timeout, SerializerDeserializer serDeser) {
+         // wait for an assignment
+         ClusterNode node = assignment.assignClusterNode();
+         this.serDeser = serDeser;
+ 
+         ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+                 Executors.newCachedThreadPool());
+ 
+         bootstrap = new ServerBootstrap(factory);
+ 
+         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+             @Override
+             public ChannelPipeline getPipeline() {
+                 ChannelPipeline p = Channels.pipeline();
+                 p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
+                 p.addLast("2", new MyChannelHandler());
+ 
+                 return p;
+             }
+         });
+ 
+         bootstrap.setOption("child.tcpNoDelay", true);
+         bootstrap.setOption("child.keepAlive", true);
+         bootstrap.setOption("child.reuseAddress", true);
+         bootstrap.setOption("child.connectTimeoutMillis", timeout);
+         bootstrap.setOption("readWriteFair", true);
+ 
+         Channel c = bootstrap.bind(new InetSocketAddress(node.getPort()));
+         channels.add(c);
+ 
+         zkClient = new ZkClient("localhost:2181");
+     }
+ 
+     @Override
 -    public byte[] recv() {
 -        while (true) {
 -            try {
 -                Thread.sleep(100000);
 -            } catch (InterruptedException e) {
 -                // TODO Auto-generated catch block
 -                e.printStackTrace();
 -            }
 -        }
 -    }
 -
 -    @Override
+     public void close() {
+         try {
+             channels.close().await();
+         } catch (InterruptedException e) {
+             Thread.currentThread().interrupt();
+         }
+         bootstrap.releaseExternalResources();
+ 
+     }
+ 
+     @Override
+     public int getPartitionId() {
+         throw new NotImplementedException();
+     }
+ 
+     public class MyChannelHandler extends SimpleChannelHandler {
+         @Override
+         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+             channels.add(e.getChannel());
+             ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
 -            Event event = (Event) serDeser.deserialize(((EventMessage) serDeser.deserialize(buffer.array()))
 -                    .getSerializedEvent());
++            Event event = (Event) serDeser.deserialize(buffer.toByteBuffer());
+ 
+             zkClient.createEphemeralSequential("/test/data", event.get("message", long.class));
+ 
+         }
+ 
+         @Override
+         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
+             logger.error("Error", event.getCause());
+             Channel c = context.getChannel();
+             c.close().addListener(new ChannelFutureListener() {
+                 @Override
+                 public void operationComplete(ChannelFuture future) throws Exception {
+                     if (future.isSuccess())
+                         channels.remove(future.getChannel());
+                     else
+                         logger.error("FAILED to close channel");
+                 }
+             });
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/051082eb/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------


Mime
View raw message