incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [4/15] git commit: Performance improvements - upgraded kryo to version 2 - removed intermediate "EventMessage" class - refactored serialized message handling by using ByteBuffer instead of byte arrays (this reduces copies and allows further optimizations
Date Fri, 18 Jan 2013 12:16:46 GMT
Performance improvements
- upgraded kryo to version 2
- removed intermediate "EventMessage" class
- refactored serialized message handling by using ByteBuffer instead of
byte arrays
(this reduces copies and allows further optimizations)
- as a result, the internal API for sender and emitters has changed


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/6fd20746
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/6fd20746
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/6fd20746

Branch: refs/heads/dev
Commit: 6fd20746cedaee45c68f3fcf00245f10a5c72d27
Parents: b0a904b
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Sep 20 17:10:25 2012 +0200
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Thu Sep 20 18:17:13 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |    7 +-
 .../src/main/java/org/apache/s4/base/Emitter.java  |    6 +-
 .../src/main/java/org/apache/s4/base/Event.java    |    5 -
 .../main/java/org/apache/s4/base/EventMessage.java |   72 ---------------
 .../src/main/java/org/apache/s4/base/Listener.java |    4 +-
 .../org/apache/s4/base/SerializerDeserializer.java |    8 +-
 subprojects/s4-benchmarks/README.md                |    5 +
 subprojects/s4-benchmarks/bench-cluster.sh         |    7 +-
 subprojects/s4-benchmarks/config/injector.config   |    2 +-
 .../apache/s4/benchmark/simpleApp1/Injector.java   |   35 +++++--
 .../apache/s4/benchmark/simpleApp1/SimpleApp.java  |    4 +
 .../apache/s4/benchmark/simpleApp1/SimplePE1.java  |   33 ++++---
 .../java/org/apache/s4/benchmark/utils/Utils.java  |    2 +-
 subprojects/s4-benchmarks/startNode.sh             |    3 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |    6 +-
 .../org/apache/s4/comm/serialize/KryoSerDeser.java |   68 ++++++++------
 .../serialize/SerializerDeserializerFactory.java   |    9 ++
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   18 ++--
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |   16 ++--
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   14 ++-
 .../java/org/apache/s4/comm/udp/UDPListener.java   |   13 ++-
 .../java/org/apache/s4/comm/DeliveryTestUtil.java  |   11 ++-
 .../org/apache/s4/comm/util/PartitionInfo.java     |   16 ++--
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |   15 +++
 subprojects/s4-core/s4-core.gradle                 |    2 +-
 .../src/main/java/org/apache/s4/core/App.java      |   13 ++-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    7 +-
 .../java/org/apache/s4/core/ProcessingElement.java |    5 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |   18 ++--
 .../main/java/org/apache/s4/core/RemoteSender.java |    6 +-
 .../java/org/apache/s4/core/RemoteSenders.java     |   32 ++++---
 .../src/main/java/org/apache/s4/core/Sender.java   |   20 ++--
 .../src/main/java/org/apache/s4/core/Stream.java   |   26 +++--
 .../java/org/apache/s4/core/util/S4Metrics.java    |    5 +-
 .../test/java/org/apache/s4/core/TriggerTest.java  |    5 +-
 .../org/apache/s4/core/ft/CheckpointingTest.java   |    9 +-
 .../org/apache/s4/core/ft/FTWordCountTest.java     |   10 +-
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |   24 +++--
 .../apache/s4/core/timers/MultithreadingTest.java  |    5 +-
 .../apache/s4/core/windowing/WindowingPETest.java  |    6 +-
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   10 +-
 .../org/apache/s4/fixtures/MockCommModule.java     |    6 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   10 +-
 43 files changed, 316 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 674009b..9755259 100644
--- a/build.gradle
+++ b/build.gradle
@@ -58,9 +58,9 @@ project.ext["libraries"] = [
     guice:              'com.google.inject:guice:3.0',
     aop_alliance:       'aopalliance:aopalliance:1.0',
     guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
-    kryo:               'com.googlecode:kryo:1.04',
+    kryo:               'com.esotericsoftware.kryo:kryo:2.17',
     minlog:             'com.googlecode:minlog:1.2',
-    reflectasm:         'com.googlecode:reflectasm:1.01',
+    reflectasm:         'com.esotericsoftware.reflectasm:reflectasm:1.07',
     netty:              'org.jboss.netty:netty:3.2.5.Final',
     mockito_core:       'org.mockito:mockito-core:1.9.0',
     commons_config:     'commons-configuration:commons-configuration:1.6',
@@ -81,7 +81,7 @@ project.ext["libraries"] = [
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
     jcommander:         'com.beust:jcommander:1.25',
-    asm:                'asm:asm:3.2',
+    asm:                'org.ow2.asm:asm:4.0',
     javax_inject:       'javax.inject:javax.inject:1',
     gradle_base_services: 'org.gradle:gradle-base-services:1.0',
     gradle_core: 'org.gradle:gradle-core:1.0',
@@ -142,6 +142,7 @@ subprojects {
         runtime(libraries.aop_alliance)
         runtime(libraries.minlog)
         runtime(libraries.gson)
+        compile(libraries.reflectasm)
         runtime(libraries.objenesis)
         runtime(libraries.kryo)
         runtime(libraries.netty)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index c73dd1c..a6ede8f 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -18,9 +18,11 @@
 
 package org.apache.s4.base;
 
+import java.nio.ByteBuffer;
+
 /**
  * Defines an event emitter, responsible for sending an event to a given partition of the cluster.
- *
+ * 
  */
 public interface Emitter {
 
@@ -33,7 +35,7 @@ public interface Emitter {
      * 
      * @return - true - if message is sent across successfully - false - if send fails
      */
-    boolean send(int partitionId, EventMessage message);
+    boolean send(int partitionId, ByteBuffer message);
 
     int getPartitionCount();
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 4e47723..8180180 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -134,11 +134,6 @@ public class Event {
 
         Data<?> data = map.get(key);
 
-        if (type != data.type) {
-            logger.error("Trying to get a value of type {} for an attribute of type {}.", type, data.type);
-            return null;
-        }
-
         return (T) data.value;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-base/src/main/java/org/apache/s4/base/EventMessage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/EventMessage.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/EventMessage.java
deleted file mode 100644
index e777687..0000000
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/EventMessage.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.base;
-
-/**
- * <p>
- * Encapsulates application-level events of type {@link Event}.
- * </p>
- * <p>
- * Indeed, events that are defined at the application level can only be handled by the classloader of the corresponding
- * application.
- * </p>
- * <p>
- * Includes routing information (application name, stream name), so that this message can be dispatched at the
- * communication level.
- * </p>
- * 
- */
-public class EventMessage {
-
-    private String appName;
-    private String streamName;
-    private byte[] serializedEvent;
-
-    public EventMessage() {
-    }
-
-    /**
-     * 
-     * @param appName
-     *            name of the application
-     * @param streamName
-     *            name of the stream
-     * @param serializedEvent
-     *            application-specific {@link Event} instance in serialized form
-     */
-    public EventMessage(String appName, String streamName, byte[] serializedEvent) {
-        super();
-        this.appName = appName;
-        this.streamName = streamName;
-        this.serializedEvent = serializedEvent;
-    }
-
-    public String getAppName() {
-        return appName;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public byte[] getSerializedEvent() {
-        return serializedEvent;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
index 608d628..98a2c60 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
@@ -18,6 +18,8 @@
 
 package org.apache.s4.base;
 
+import java.nio.ByteBuffer;
+
 /**
  * 
  * Get a byte array received by a lower level layer.
@@ -33,7 +35,7 @@ public interface Listener {
      *         <li>null if the associated blocking thread is interrupted</li>
      *         </ul>
      */
-    byte[] recv();
+    ByteBuffer recv();
 
     public int getPartitionId();
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java
index f7ef101..376f98d 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java
@@ -18,12 +18,14 @@
 
 package org.apache.s4.base;
 
+import java.nio.ByteBuffer;
+
 /**
  * Defines serialization and deserialization methods used within the S4 platform, typically for events and PEs.
- *
+ * 
  */
 public interface SerializerDeserializer {
-    public byte[] serialize(Object message);
+    public ByteBuffer serialize(Object message);
 
-    public Object deserialize(byte[] rawMessage);
+    public Object deserialize(ByteBuffer rawMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
index f7109a3..9fa3f21 100644
--- a/subprojects/s4-benchmarks/README.md
+++ b/subprojects/s4-benchmarks/README.md
@@ -60,3 +60,8 @@ When the benchmark finishes, results are available in `measurements/injectors` f
 
 Most statistics files come from the probes of the platform and some of them use weighted moving averages. These are good for long running applications. For the benchmarks we also show instant rates, which are available in `injection-rate.csv` and `simplePE1.csv` files.
 
+## Notes
+
+In the current design of S4, messages sent to output streams are not queued by S4 and directly passed to the communication layer.
+
+This implies that if the communication layer is not able to send those messages at least as fast as they are generated, the buffer of pending messages will increase rapidly. This may lead to memory problems in the injector. Solving the problem requires tuning the number of parallel injection threads.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/bench-cluster.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/bench-cluster.sh b/subprojects/s4-benchmarks/bench-cluster.sh
index 2b51037..654f3c1 100755
--- a/subprojects/s4-benchmarks/bench-cluster.sh
+++ b/subprojects/s4-benchmarks/bench-cluster.sh
@@ -4,6 +4,7 @@
 HOSTS=$1
 INJECTOR_CONFIG=$2
 NODE_CONFIG=$3
+NB_INJECTORS=$4
 BENCH_ROOTDIR=`pwd`
 
 echo "hosts = $HOSTS"
@@ -26,7 +27,7 @@ do
 	((NB_NODES++))
 done
 
-(cd $BENCH_ROOTDIR/../../ && ./s4 zkServer -clusters=c=testCluster1:flp=12000:nbTasks=1,c=testCluster2:flp=13000:nbTasks=$NB_NODES &)
+(cd $BENCH_ROOTDIR/../../ && ./s4 zkServer -clusters=c=testCluster1:flp=12000:nbTasks=$NB_INJECTORS,c=testCluster2:flp=13000:nbTasks=$NB_NODES &)
 
 
 sleep 6
@@ -52,7 +53,9 @@ done
 
 sleep 15
 
-java -cp `cat classpath.txt` org.apache.s4.core.Main "@$INJECTOR_CONFIG"
+for ((i = 1; i <= $NB_INJECTORS; i++)); do
+	java -Xmx200m -Xms200m -verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -cp `cat classpath.txt` org.apache.s4.core.Main "@$INJECTOR_CONFIG" &
+done
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/config/injector.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/injector.config b/subprojects/s4-benchmarks/config/injector.config
index 365e734..c189b55 100644
--- a/subprojects/s4-benchmarks/config/injector.config
+++ b/subprojects/s4-benchmarks/config/injector.config
@@ -1,3 +1,3 @@
 -c=testCluster1
 -appClass=org.apache.s4.benchmark.simpleApp1.Injector
--p=s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=4,s4.benchmark.warmupIterations=100000,s4.benchmark.testIterations=500000,s4.benchmark.testSleepInterval=0,s4.benchmark.warmupSleepInterval=0,s4.benchmark.injector.parallelism=2
+-p=s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=2,s4.benchmark.warmupIterations=100000,s4.benchmark.testIterations=1000000,s4.benchmark.testSleepInterval=0,s4.benchmark.warmupSleepInterval=0,s4.benchmark.injector.parallelism=2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
index 614cd59..e2fd079 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
@@ -27,7 +27,6 @@ import com.google.inject.name.Named;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.CsvReporter;
 
 public class Injector extends AdapterApp {
 
@@ -73,9 +72,9 @@ public class Injector extends AdapterApp {
 
         File logDir = new File(System.getProperty("user.dir") + "/measurements/injectors");
         if (!logDir.mkdirs()) {
-            throw new RuntimeException("Cannot create dir " + logDir.getAbsolutePath());
+            logger.debug("Cannot create dir " + logDir.getAbsolutePath());
         }
-        CsvReporter.enable(logDir, 5, TimeUnit.SECONDS);
+        // CsvReporter.enable(logDir, 5, TimeUnit.SECONDS);
         remoteStreamKeyFinder = new KeyFinder<Event>() {
 
             @Override
@@ -85,10 +84,21 @@ public class Injector extends AdapterApp {
         };
         super.onInit();
         ConsoleReporter.enable(30, TimeUnit.SECONDS);
+        // Metrics.shutdown();
         ZkClient zkClient = new ZkClient(zkString);
-        zkClient.createPersistent("/benchmarkConfig");
-        zkClient.createPersistent("/benchmarkConfig/warmupIterations", warmupIterations * parallelism);
-        zkClient.createPersistent("/benchmarkConfig/testIterations", testIterations * parallelism);
+        if (getReceiver().getPartition() == 0) {
+            zkClient.createPersistent("/benchmarkConfig");
+            zkClient.createPersistent("/benchmarkConfig/warmupIterations", warmupIterations * parallelism);
+            zkClient.createPersistent("/benchmarkConfig/testIterations", testIterations * parallelism);
+            zkClient.createPersistent("/warmup");
+            zkClient.createPersistent("/test");
+        }
+
+        if (!(getReceiver().getPartition() == 0)) {
+            zkClient.waitUntilExists("/test", TimeUnit.SECONDS, 10);
+        }
+        zkClient.createPersistent("/warmup/injector-" + getReceiver().getPartition());
+        zkClient.createPersistent("/test/injector-" + getReceiver().getPartition());
         zkClient.close();
     }
 
@@ -121,7 +131,8 @@ public class Injector extends AdapterApp {
             }
         });
 
-        CountDownLatch signalWarmupComplete = Utils.getReadySignal(zkString, "/warmup", keysCount);
+        CountDownLatch signalWarmupComplete = Utils.getReadySignal(zkString, "/warmup/injector-"
+                + getReceiver().getPartition(), keysCount);
 
         RemoteStream remoteStream = getRemoteStream();
         generateEvents(remoteStream, warmupIterations, keysCount, warmupSleepInterval, parallelism);
@@ -131,13 +142,15 @@ public class Injector extends AdapterApp {
         // now that we are certain app nodes are connected, check the target cluster
         ZkClient zkClient = new ZkClient(zkString);
         zkClient.setZkSerializer(new ZNRecordSerializer());
+
         ZNRecord readData = zkClient.readData("/s4/streams/" + getRemoteStream().getName() + "/consumers/"
                 + zkClient.getChildren("/s4/streams/" + getRemoteStream().getName() + "/consumers").get(0));
         String remoteClusterName = readData.getSimpleField("clusterName");
 
         int appPartitionCount = zkClient.countChildren("/s4/clusters/" + remoteClusterName + "/tasks");
         zkClient.close();
-        CountDownLatch signalBenchComplete = Utils.getReadySignal(zkString, "/test", appPartitionCount);
+        CountDownLatch signalBenchComplete = Utils.getReadySignal(zkString, "/test/injector-"
+                + getReceiver().getPartition(), appPartitionCount);
 
         try {
             signalWarmupComplete.await();
@@ -186,6 +199,7 @@ public class Injector extends AdapterApp {
             Event event = new Event();
             event.put("key", Integer.class, j);
             event.put("value", Long.class, stopKey);
+            event.put("injector", Integer.class, getReceiver().getPartition());
             logger.info("Sending stop event with key {}", stopKey);
             remoteStream.put(event);
         }
@@ -216,8 +230,9 @@ public class Injector extends AdapterApp {
             for (long i = 0; i < iterations; i++) {
                 for (int j = 0; j < keysCount; j++) {
                     Event event = new Event();
-                    event.put("key", Integer.class, j);
-                    event.put("value", Long.class, counter.incrementAndGet());
+                    event.put("key", int.class, j);
+                    event.put("value", long.class, counter.incrementAndGet());
+                    event.put("injector", Integer.class, getReceiver().getPartition());
                     // logger.info("{}/{}/{}/",
                     // new String[] { Thread.currentThread().getName(), String.valueOf(i), String.valueOf(j),
                     // String.valueOf(event.get("value")) });

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
index 6385bc9..e1570f9 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
@@ -44,6 +44,10 @@ public class SimpleApp extends App {
         Long warmupIterations = zkClient.readData("/benchmarkConfig/warmupIterations");
         Long testIterations = zkClient.readData("/benchmarkConfig/testIterations");
 
+        // TODO fix hardcoded cluster name (pass injector config?)
+        int nbInjectors = zkClient.countChildren("/s4/clusters/testCluster1/tasks");
+        simplePE1.setNbInjectors(nbInjectors);
+
         simplePE1.setWarmupIterations(warmupIterations);
         simplePE1.setTestIterations(testIterations);
         createInputStream("inputStream", new KeyFinder<Event>() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
index 1fd40fb..20e4fc5 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
@@ -18,11 +18,13 @@ public class SimplePE1 extends ProcessingElement {
     private static Logger logger = LoggerFactory.getLogger(SimplePE1.class);
 
     private long warmupIterations = -1;
-    boolean warmedUp = false;
+    int warmedUp = 0;
+    int finished = 0;
     private long testIterations = -1;
     AtomicLong counter = new AtomicLong();
     BigDecimal rate;
     long lastTime = -1;
+    int nbInjectors;
 
     public void setWarmupIterations(long warmupIterations) {
         this.warmupIterations = warmupIterations;
@@ -32,6 +34,10 @@ public class SimplePE1 extends ProcessingElement {
         this.testIterations = testIterations;
     }
 
+    public void setNbInjectors(int nbInjectors) {
+        this.nbInjectors = nbInjectors;
+    }
+
     public void onEvent(Event event) {
         counter.incrementAndGet();
 
@@ -47,19 +53,20 @@ public class SimplePE1 extends ProcessingElement {
             }
         }
 
-        Long value = event.get("value", Long.class);
+        Long value = event.get("value", long.class);
         // logger.info("reached value {}", value);
-        if (!warmedUp && (value == -1)) {
-            logger.info("**** Warmed up");
-            addSequentialNode("/warmup");
-            warmedUp = true;
-
-        } else if (value == (-2)) {
-            logger.info("******* finished **************");
-
-            addSequentialNode("/test");
-            System.exit(0);
-            logger.info("SADFASFDASFDASFDASFDASDFASDFASDF**************");
+        if (!(warmedUp == nbInjectors) && (value == -1)) {
+            logger.info("**** Warmed up for an injector");
+            addSequentialNode("/warmup/injector-" + event.get("injector", Integer.class));
+            warmedUp++;
+
+        } else if (!(finished == nbInjectors) && (value == (-2))) {
+            logger.info("******* finished an injector **************");
+            finished++;
+            addSequentialNode("/test/injector-" + event.get("injector", Integer.class));
+            if (finished == nbInjectors) {
+                System.exit(0);
+            }
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
index 2f28feb..bf1023a 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
@@ -26,7 +26,7 @@ public class Utils {
             public void handleChildChange(String arg0, List<String> arg1) throws Exception {
 
                 if (parentPath.equals(arg0)) {
-                    if (arg1.size() == counts) {
+                    if (arg1.size() >= counts) {
                         logger.info("Latch reached for {} with {} children", arg0, counts);
                         signalReady.countDown();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-benchmarks/startNode.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/startNode.sh b/subprojects/s4-benchmarks/startNode.sh
index 53ba4b1..ed9cf3d 100755
--- a/subprojects/s4-benchmarks/startNode.sh
+++ b/subprojects/s4-benchmarks/startNode.sh
@@ -13,9 +13,8 @@ cd $BENCH_ROOTDIR
 
 
 # you may add profiling to the application nodes using the correct options for your system
-#PROFILING_OPTS="-agentpath:/Applications/YourKit_Java_Profiler_11.0.8.app/bin/mac/libyjpagent.jnilib=delay=10000,onexit=snapshot,onexit=memory,sampling,monitors"
 PROFILING_OPTS=""
 
-java $PROFILING_OPTS -server -cp `cat classpath.txt` org.apache.s4.core.Main "@$NODE_CONFIG" &
+java -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:gc-$host.log $PROFILING_OPTS -server -cp `cat classpath.txt` org.apache.s4.core.Main "@$NODE_CONFIG" &
 
 # java -cp `cat classpath.txt` org.apache.s4.core.Main "@`pwd`/src/main/resources/injector.config"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index d7c8cee..c7c43a2 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -31,6 +31,7 @@ import org.apache.s4.base.Listener;
 import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
@@ -87,7 +88,10 @@ public class DefaultCommModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
         /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
+        // classloader
+        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+                SerializerDeserializerFactory.class));
 
         // a node holds a single partition assignment
         // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
index f622de6..bff7b4b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
@@ -23,9 +23,10 @@ import java.nio.ByteBuffer;
 import org.apache.s4.base.SerializerDeserializer;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.ObjectBuffer;
-import com.esotericsoftware.kryo.serialize.ClassSerializer;
-import com.esotericsoftware.kryo.serialize.SimpleSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
 
 /**
  * Serializer/deserializer based on <a href="http://code.google.com/p/kryo/">kryo</a>
@@ -33,60 +34,65 @@ import com.esotericsoftware.kryo.serialize.SimpleSerializer;
  */
 public class KryoSerDeser implements SerializerDeserializer {
 
-    private Kryo kryo = new Kryo();
+    private ThreadLocal<Kryo> kryoThreadLocal;
+    private ThreadLocal<Output> outputThreadLocal;
 
     private int initialBufferSize = 2048;
     private int maxBufferSize = 256 * 1024;
 
-    public void setInitialBufferSize(int initialBufferSize) {
-        this.initialBufferSize = initialBufferSize;
-    }
-
     public void setMaxBufferSize(int maxBufferSize) {
         this.maxBufferSize = maxBufferSize;
     }
 
-    public KryoSerDeser() {
-        this(Thread.currentThread().getContextClassLoader());
-    }
-
     /**
      * 
      * @param classLoader
      *            classloader able to handle classes to serialize/deserialize. For instance, application-level events
      *            can only be handled by the application classloader.
      */
-    public KryoSerDeser(ClassLoader classLoader) {
-        kryo.setClassLoader(classLoader);
-        kryo.setRegistrationOptional(true);
+    @Inject
+    public KryoSerDeser(@Assisted final ClassLoader classLoader) {
+        kryoThreadLocal = new ThreadLocal<Kryo>() {
 
-        kryo.register(Class.class, new ClassSerializer(kryo));
-        // UUIDs don't have a no-arg constructor.
-        kryo.register(java.util.UUID.class, new SimpleSerializer<java.util.UUID>() {
             @Override
-            public java.util.UUID read(ByteBuffer buf) {
-                return new java.util.UUID(buf.getLong(), buf.getLong());
+            protected Kryo initialValue() {
+                Kryo kryo = new Kryo();
+                kryo.setClassLoader(classLoader);
+                kryo.setRegistrationRequired(false);
+                return kryo;
             }
+        };
 
+        outputThreadLocal = new ThreadLocal<Output>() {
             @Override
-            public void write(ByteBuffer buf, java.util.UUID uuid) {
-                buf.putLong(uuid.getMostSignificantBits());
-                buf.putLong(uuid.getLeastSignificantBits());
+            protected Output initialValue() {
+                Output output = new Output(initialBufferSize, maxBufferSize);
+                return output;
             }
-
-        });
+        };
 
     }
 
     @Override
-    public Object deserialize(byte[] rawMessage) {
-        ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
-        return buffer.readClassAndObject(rawMessage);
+    public Object deserialize(ByteBuffer rawMessage) {
+        Input input = new Input(rawMessage.array());
+        try {
+            return kryoThreadLocal.get().readClassAndObject(input);
+        } finally {
+            input.close();
+        }
     }
 
+    @SuppressWarnings("resource")
     @Override
-    public byte[] serialize(Object message) {
-        ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
-        return buffer.writeClassAndObject(message);
+    public ByteBuffer serialize(Object message) {
+        Output output = outputThreadLocal.get();
+        try {
+            kryoThreadLocal.get().writeClassAndObject(output, message);
+
+            return ByteBuffer.wrap(output.toBytes());
+        } finally {
+            output.clear();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/SerializerDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/SerializerDeserializerFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/SerializerDeserializerFactory.java
new file mode 100644
index 0000000..f491264
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/SerializerDeserializerFactory.java
@@ -0,0 +1,9 @@
+package org.apache.s4.comm.serialize;
+
+import org.apache.s4.base.SerializerDeserializer;
+
+public interface SerializerDeserializerFactory {
+
+    SerializerDeserializer createSerializerDeserializer(ClassLoader classLoader);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index a6cc8b5..b6779a0 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -20,14 +20,15 @@ package org.apache.s4.comm.tcp;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.s4.base.Emitter;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
@@ -91,6 +91,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private final Lock lock;
 
     @Inject
+    SerializerDeserializerFactory serDeserFactory;
     SerializerDeserializer serDeser;
 
     CommMetrics.EmitterMetrics metrics;
@@ -103,7 +104,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
         // Initialize data structures
         int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
-        partitionChannelMap = Maps.synchronizedBiMap(HashBiMap.<Integer, Channel> create(clusterSize));
+        partitionChannelMap = HashBiMap.create(clusterSize);
         partitionNodeMap = HashBiMap.create(clusterSize);
 
         // Initialize netty related structures
@@ -131,6 +132,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private void init() {
         refreshCluster();
         this.topology.addListener(this);
+        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+
         metrics = new EmitterMetrics(topology);
     }
 
@@ -161,9 +164,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         return false;
     }
 
-    private void sendMessage(int partitionId, byte[] message) {
-        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-        buffer.writeBytes(message);
+    private void sendMessage(int partitionId, ByteBuffer message) {
+        ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
 
         if (!partitionChannelMap.containsKey(partitionId)) {
             if (!connectTo(partitionId)) {
@@ -183,8 +185,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     }
 
     @Override
-    public boolean send(int partitionId, EventMessage message) {
-        sendMessage(partitionId, serDeser.serialize(message));
+    public boolean send(int partitionId, ByteBuffer message) {
+        sendMessage(partitionId, message);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index d495033..b2700e5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -19,6 +19,7 @@
 package org.apache.s4.comm.tcp;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
@@ -56,7 +57,7 @@ import com.google.inject.name.Named;
  */
 public class TCPListener implements Listener {
     private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
-    private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
+    private BlockingQueue<ChannelBuffer> handoffQueue = new SynchronousQueue<ChannelBuffer>();
     private ClusterNode node;
     private ServerBootstrap bootstrap;
     private final ChannelGroup channels = new DefaultChannelGroup();
@@ -93,10 +94,9 @@ public class TCPListener implements Listener {
         channels.add(c);
     }
 
-    public byte[] recv() {
+    public ByteBuffer recv() {
         try {
-            byte[] msg = handoffQueue.take();
-            return msg;
+            return handoffQueue.take().toByteBuffer();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return null;
@@ -118,9 +118,9 @@ public class TCPListener implements Listener {
     }
 
     public class ChannelHandler extends SimpleChannelHandler {
-        private BlockingQueue<byte[]> handoffQueue;
+        private BlockingQueue<ChannelBuffer> handoffQueue;
 
-        public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
+        public ChannelHandler(BlockingQueue<ChannelBuffer> handOffQueue) {
             this.handoffQueue = handOffQueue;
         }
 
@@ -128,8 +128,8 @@ public class TCPListener implements Listener {
             channels.add(e.getChannel());
             ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
             try {
-                handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
-                                                  // there's no receiver at the other end of the handoff queue
+                handoffQueue.put(buffer); // this holds up the Netty upstream I/O thread if
+                                          // there's no receiver at the other end of the handoff queue
             } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 2f156b2..d335970 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -23,12 +23,13 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.SocketException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.s4.base.Emitter;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
@@ -39,7 +40,7 @@ import com.google.inject.Inject;
 
 /**
  * UDP based emitter.
- *
+ * 
  */
 public class UDPEmitter implements Emitter, ClusterChangeListener {
     private DatagramSocket socket;
@@ -49,6 +50,7 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
     private final Cluster topology;
 
     @Inject
+    SerializerDeserializerFactory serDeserFactory;
     SerializerDeserializer serDeser;
 
     public long getMessageDropInQueueCount() {
@@ -72,14 +74,14 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
 
     @Inject
     private void init() {
+        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
         topology.addListener(this);
         refreshCluster();
     }
 
     @Override
-    public boolean send(int partitionId, EventMessage eventMessage) {
+    public boolean send(int partitionId, ByteBuffer message) {
         try {
-            byte[] message = serDeser.serialize(eventMessage);
             ClusterNode node = nodes.get(partitionId);
             if (node == null) {
                 LoggerFactory.getLogger(getClass()).error(
@@ -87,8 +89,8 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
                         partitionId);
                 return false;
             }
-            byte[] byteBuffer = new byte[message.length];
-            System.arraycopy(message, 0, byteBuffer, 0, message.length);
+            byte[] byteBuffer = new byte[message.array().length];
+            System.arraycopy(message.array(), 0, byteBuffer, 0, message.array().length);
             InetAddress inetAddress = inetCache.get(partitionId);
             if (inetAddress == null) {
                 inetAddress = InetAddress.getByName(node.getMachineName());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 319c7ab..d7f6275 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -21,12 +21,15 @@ package org.apache.s4.comm.udp;
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
+import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.s4.base.Listener;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 
 import com.google.inject.Inject;
 
@@ -41,7 +44,7 @@ public class UDPListener implements Listener, Runnable {
     private DatagramPacket datagram;
     private byte[] bs;
     static int BUFFER_LENGTH = 65507;
-    private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
+    private BlockingQueue<ByteBuffer> handoffQueue = new SynchronousQueue<ByteBuffer>();
     private ClusterNode node;
 
     @Inject
@@ -74,11 +77,11 @@ public class UDPListener implements Listener, Runnable {
         try {
             while (!Thread.interrupted()) {
                 socket.receive(datagram);
-                byte[] data = new byte[datagram.getLength()];
-                System.arraycopy(datagram.getData(), datagram.getOffset(), data, 0, data.length);
+                ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(datagram.getData(), datagram.getOffset(),
+                        datagram.getLength());
                 datagram.setLength(BUFFER_LENGTH);
                 try {
-                    handoffQueue.put(data);
+                    handoffQueue.put(copiedBuffer.toByteBuffer());
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                 }
@@ -88,7 +91,7 @@ public class UDPListener implements Listener, Runnable {
         }
     }
 
-    public byte[] recv() {
+    public ByteBuffer recv() {
         try {
             return handoffQueue.take();
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
index 17340b8..22b68a0 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
@@ -18,9 +18,11 @@
 
 package org.apache.s4.comm;
 
+import java.nio.ByteBuffer;
+
 import org.apache.s4.base.Emitter;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Listener;
+import org.jboss.netty.buffer.ChannelBuffers;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
@@ -68,8 +70,9 @@ public class DeliveryTestUtil {
             try {
                 for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
                     for (int i = 0; i < numMessages; i++) {
-                        byte[] message = (new String("message-" + i)).getBytes();
-                        emitter.send(partition, new EventMessage(null, null, message));
+                        ByteBuffer message = ChannelBuffers.wrappedBuffer((new String("message-" + i)).getBytes())
+                                .toByteBuffer();
+                        emitter.send(partition, message);
                         Thread.sleep(interval);
                     }
                 }
@@ -130,7 +133,7 @@ public class DeliveryTestUtil {
             TimerThread timer = new TimerThread(this);
             timer.start();
             while (messagesReceived < messagesExpected) {
-                byte[] message = listener.recv();
+                ByteBuffer message = listener.recv();
                 timer.resetSleepCounter();
                 if (message != null)
                     messagesReceived++;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
index 97cc973..e744d39 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
@@ -18,14 +18,15 @@
 
 package org.apache.s4.comm.util;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
 
 import org.apache.s4.base.Emitter;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,13 +57,13 @@ public class PartitionInfo {
     private int partitionId;
     private ProtocolTestUtil ptu;
 
-    @Inject
     SerializerDeserializer serDeser;
 
     @Inject
-    public PartitionInfo(Emitter emitter, Listener listener) {
+    public PartitionInfo(Emitter emitter, Listener listener, SerializerDeserializerFactory serDeserFactory) {
         this.emitter = emitter;
         this.listener = listener;
+        this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
         this.partitionId = this.listener.getPartitionId();
         logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
                 this.listener.getPartitionId());
@@ -90,10 +91,8 @@ public class PartitionInfo {
             try {
                 for (int i = 0; i < numMessages; i++) {
                     for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-                        EventMessage message = new EventMessage("app1", "stream1",
-                                new String(partitionId + " " + i).getBytes());
                         for (int retries = 0; retries < numRetries; retries++) {
-                            if (emitter.send(partition, message)) {
+                            if (emitter.send(partition, ByteBuffer.wrap(new String(partitionId + " " + i).getBytes()))) {
                                 sendCounts[partition]++;
                                 break;
                             }
@@ -128,15 +127,14 @@ public class PartitionInfo {
         @Override
         public void run() {
             while (messagesReceived < ptu.expectedMessages[partitionId]) {
-                byte[] message = listener.recv();
+                ByteBuffer message = listener.recv();
                 if (message == null) {
                     logger.error("ReceiveThread {}: received a null message", partitionId);
                     break;
                 }
 
-                EventMessage deserialized = (EventMessage) serDeser.deserialize(message);
                 // process and store the message
-                String msgString = new String(deserialized.getSerializedEvent());
+                String msgString = new String(message.array());
                 String[] msgTokens = msgString.split(" ");
                 Integer senderPartition = Integer.parseInt(msgTokens[0]);
                 Integer receivedMsg = Integer.parseInt(msgTokens[1]);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 411c7c6..89f4982 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -19,12 +19,14 @@
 package org.apache.s4.fixtures;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,8 +44,21 @@ public abstract class ZkBasedTest {
         this.numTasks = numTasks;
     }
 
+    @BeforeClass
+    public static void initClass() {
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("Uncaught error in thread {}: {}", t.getName(), e);
+
+            }
+        });
+    }
+
     @Before
     public void prepare() throws IOException, InterruptedException, KeeperException {
+
         CommTestUtils.cleanupTmpDirs();
 
         zkFactory = CommTestUtils.startZookeeperServer();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 0905ce7..7675e15 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -26,7 +26,7 @@ dependencies {
     compile libraries.jcommander
     compile libraries.asm
     compile libraries.netty
-    compile libraries.zkclient
+    compile libraries.zkclient
     testCompile project(path: ':s4-comm', configuration: 'tests')
     testCompile libraries.gradle_base_services
     testCompile libraries.gradle_core

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index cf64080..43d137f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -27,7 +27,7 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
@@ -85,7 +85,14 @@ public abstract class App {
     CheckpointingFramework checkpointingFramework;
 
     // serialization uses the application class loader
-    private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
+    @Inject
+    private SerializerDeserializerFactory serDeserFactory;
+    private SerializerDeserializer serDeser;
+
+    @Inject
+    private void initSerDeser() {
+        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+    }
 
     /**
      * The internal clock can be configured as "wall clock" or "event clock". The wall clock computes time from the
@@ -299,7 +306,7 @@ public abstract class App {
             ProcessingElement... processingElements) {
 
         return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements).setEventType(eventType)
-                .register();
+                .setSerializerDeserializerFactory(serDeserFactory).register();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index bdf9379..8874b93 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -25,10 +25,8 @@ import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Hasher;
-import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.core.util.S4Metrics;
@@ -74,9 +72,6 @@ public class DefaultCoreModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
         bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
 
         bind(S4RLoaderFactory.class);
@@ -85,7 +80,7 @@ public class DefaultCoreModule extends AbstractModule {
         // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
 
-        bind(S4Metrics.class).asEagerSingleton();
+        bind(S4Metrics.class);
     }
 
     private void loadProperties(Binder binder) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index aca9327..b1a0c10 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -21,6 +21,7 @@ package org.apache.s4.core;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.TimerTask;
@@ -792,11 +793,11 @@ public abstract class ProcessingElement implements Cloneable {
     }
 
     public byte[] serializeState() {
-        return getApp().getSerDeser().serialize(this);
+        return getApp().getSerDeser().serialize(this).array();
     }
 
     public ProcessingElement deserializeState(byte[] loadedState) {
-        return (ProcessingElement) getApp().getSerDeser().deserialize(loadedState);
+        return (ProcessingElement) getApp().getSerDeser().deserialize(ByteBuffer.wrap(loadedState));
     }
 
     public void restoreState(ProcessingElement oldState) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 23c8f2d..1cf8fe8 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -18,12 +18,13 @@
 
 package org.apache.s4.core;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,9 +58,9 @@ public class Receiver implements Runnable {
     private Thread thread;
 
     @Inject
-    public Receiver(Listener listener, SerializerDeserializer serDeser) {
+    public Receiver(Listener listener, SerializerDeserializerFactory serDeserFactory) {
         this.listener = listener;
-        this.serDeser = serDeser;
+        this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
 
         thread = new Thread(this, "Receiver");
         // TODO avoid starting the thread here
@@ -98,11 +99,10 @@ public class Receiver implements Runnable {
         // TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener
         // here?
         while (!Thread.interrupted()) {
-            byte[] message = listener.recv();
-            S4Metrics.receivedEvent(message.length);
-            EventMessage event = (EventMessage) serDeser.deserialize(message);
+            ByteBuffer message = listener.recv();
+            S4Metrics.receivedEvent(message.array().length);
+            Event event = (Event) serDeser.deserialize(message);
 
-            int appId = Integer.valueOf(event.getAppName());
             String streamId = event.getStreamName();
 
             /*
@@ -110,9 +110,9 @@ public class Receiver implements Runnable {
              * make this more efficient for the case in which we send the same event to multiple PEs.
              */
             try {
-                streams.get(appId).get(streamId).receiveEvent(event);
+                streams.get(-1).get(streamId).receiveEvent(event);
             } catch (NullPointerException e) {
-                logger.error("Could not find target stream for event with appId={} and streamId={}", appId, streamId);
+                logger.error("Could not find target stream for event with streamId={}", streamId);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 2d772bf..cc9d70b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -18,10 +18,10 @@
 
 package org.apache.s4.core;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.s4.base.Emitter;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.core.util.S4Metrics;
 
@@ -46,7 +46,7 @@ public class RemoteSender {
 
     }
 
-    public void send(String hashKey, EventMessage eventMessage) {
+    public void send(String hashKey, ByteBuffer message) {
         int partition;
         if (hashKey == null) {
             // round robin by default
@@ -54,7 +54,7 @@ public class RemoteSender {
         } else {
             partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
         }
-        emitter.send(partition, eventMessage);
+        emitter.send(partition, message);
         S4Metrics.sentEventToRemoteCluster(remoteClusterName, partition);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 282e747..9c9e09c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -23,9 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.RemoteStreams;
@@ -44,35 +44,39 @@ public class RemoteSenders {
 
     Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
 
-    @Inject
-    RemoteEmitters emitters;
+    RemoteEmitters remoteEmitters;
 
-    @Inject
-    RemoteStreams streams;
+    RemoteStreams remoteStreams;
 
-    @Inject
-    Clusters topologies;
+    Clusters remoteClusters;
 
-    @Inject
     SerializerDeserializer serDeser;
 
-    @Inject
     Hasher hasher;
 
     ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
 
+    @Inject
+    public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
+            SerializerDeserializerFactory serDeserFactory, Hasher hasher) {
+        this.remoteEmitters = remoteEmitters;
+        this.remoteStreams = remoteStreams;
+        this.remoteClusters = remoteClusters;
+        this.hasher = hasher;
+
+        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+    }
+
     public void send(String hashKey, Event event) {
 
-        Set<StreamConsumer> consumers = streams.getConsumers(event.getStreamName());
+        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
         event.setAppId(-1);
-        EventMessage eventMessage = new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(),
-                serDeser.serialize(event));
         for (StreamConsumer consumer : consumers) {
             // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
             // represented by a single stream consumer
             RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
             if (sender == null) {
-                RemoteSender newSender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer
+                RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
                         .getClusterName())), hasher, consumer.getClusterName());
                 // TODO cleanup when remote topologies die
                 sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
@@ -82,7 +86,7 @@ public class RemoteSenders {
             }
             // we must set the app id of the consumer app for correct dispatch within the consumer node
             // NOTE: this implies multiple serializations, there might be an optimization
-            sender.send(hashKey, eventMessage);
+            sender.send(hashKey, serDeser.serialize(event));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 32cdde1..42cffa2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -18,11 +18,13 @@
 
 package org.apache.s4.core;
 
+import java.nio.ByteBuffer;
+
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.core.util.S4Metrics;
@@ -60,9 +62,9 @@ public class Sender {
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
+    public Sender(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher, Assignment assignment) {
         this.emitter = emitter;
-        this.serDeser = serDeser;
+        this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
         this.hasher = hasher;
         this.assignment = assignment;
     }
@@ -92,15 +94,14 @@ public class Sender {
             /* Hey we are in the same JVM, don't use the network. */
             return false;
         }
-        send(partition,
-                new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser.serialize(event)));
+        send(partition, serDeser.serialize(event));
         S4Metrics.sentEvent(partition);
         return true;
     }
 
-    private void send(int partition, EventMessage event) {
+    private void send(int partition, ByteBuffer message) {
 
-        emitter.send(partition, event);
+        emitter.send(partition, message);
     }
 
     /**
@@ -116,10 +117,7 @@ public class Sender {
 
             /* Don't use the comm layer when we send to the same partition. */
             if (localPartitionId != i) {
-                emitter.send(
-                        i,
-                        new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser
-                                .serialize(event)));
+                emitter.send(i, serDeser.serialize(event));
                 S4Metrics.sentEvent(i);
 
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index b837d6d..53ddc7a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -23,10 +23,11 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.GenericKeyFinder;
 import org.apache.s4.base.Key;
 import org.apache.s4.base.KeyFinder;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,13 +52,14 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     private String name;
     protected Key<T> key;
     private ProcessingElement[] targetPEs;
-    protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
+    protected final BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
     private Thread thread;
     final private Sender sender;
     final private Receiver receiver;
     // final private int id;
     final private App app;
     private Class<T> eventType = null;
+    SerializerDeserializer serDeser;
 
     /**
      * Send events using a {@link KeyFinder}. The key finder extracts the value of the key which is used to determine
@@ -191,8 +193,8 @@ public class Stream<T extends Event> implements Runnable, Streamable {
                      * Sender checked and decided that the target is local so we simply put the event in the queue and
                      * we save the trip over the network.
                      */
-                    queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app
-                            .getSerDeser().serialize(event)));
+                    // TODO no need to serialize for local queue
+                    queue.put(event);
                 }
 
             } else {
@@ -204,8 +206,8 @@ public class Stream<T extends Event> implements Runnable, Streamable {
                  */
                 sender.sendToRemotePartitions(event);
 
-                queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser()
-                        .serialize(event)));
+                // TODO no need to serialize for local queue
+                queue.put(event);
                 // TODO abstraction around queue and add dropped counter
                 // TODO add counter for local events
 
@@ -219,7 +221,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     /**
      * The low level {@link Receiver} object call this method when a new {@link Event} is available.
      */
-    public void receiveEvent(EventMessage event) {
+    public void receiveEvent(Event event) {
         try {
             queue.put(event);
             // TODO abstraction around queue and add dropped counter
@@ -283,12 +285,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
         while (true) {
             try {
                 /* Get oldest event in queue. */
-                EventMessage eventMessage = queue.take();
+                T event = (T) queue.take();
                 S4Metrics.dequeuedEvent(name);
 
-                @SuppressWarnings("unchecked")
-                T event = (T) app.getSerDeser().deserialize(eventMessage.getSerializedEvent());
-
                 /* Send event to each target PE. */
                 for (int i = 0; i < targetPEs.length; i++) {
 
@@ -331,4 +330,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
         app.addStream(this);
         return this;
     }
+
+    public Stream<T> setSerializerDeserializerFactory(SerializerDeserializerFactory serDeserFactory) {
+        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index fe3fa80..46abecf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -51,10 +51,9 @@ public class S4Metrics {
     @Inject
     private void init() {
         senderMeters = new Meter[emitter.getPartitionCount()];
-        int localPartitionId = assignment.assignClusterNode().getPartition();
+        // int localPartitionId = assignment.assignClusterNode().getPartition();
         for (int i = 0; i < senderMeters.length; i++) {
-            senderMeters[i] = Metrics.newMeter(Sender.class, "sender", "sent-to-"
-                    + ((i == localPartitionId) ? i + "(local)" : i), TimeUnit.SECONDS);
+            senderMeters[i] = Metrics.newMeter(Sender.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 97b025c..f7236cf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.core.triggers.TriggeredApp;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.MockCommModule;
@@ -79,7 +78,9 @@ public abstract class TriggerTest extends ZkBasedTest {
         CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
 
-        app.stream.receiveEvent(new EventMessage("-1", "stream", app.getSerDeser().serialize(new StringEvent(time1))));
+        StringEvent event = new StringEvent(time1);
+        event.setStreamId("stream");
+        app.stream.receiveEvent(event);
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index 6001595..b9543c8 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -29,7 +29,6 @@ import junit.framework.Assert;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
@@ -88,10 +87,11 @@ public class CheckpointingTest {
         app.start();
 
         Event event = new Event();
+        event.setStreamId("stream1");
         event.put("command", String.class, "setValue1");
         event.put("value", String.class, "message1");
 
-        app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
+        app.testStream.receiveEvent(event);
 
         signalValue1Set.await();
 
@@ -102,8 +102,9 @@ public class CheckpointingTest {
 
         // 3. generate a checkpoint event
         event = new Event();
+        event.setStreamId("stream1");
         event.put("command", String.class, "checkpoint");
-        app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
+        app.testStream.receiveEvent(event);
         Assert.assertTrue(signalCheckpointed.await(10, TimeUnit.SECONDS));
 
         // NOTE: the backend has asynchronous save operations
@@ -125,7 +126,7 @@ public class CheckpointingTest {
         idField.setAccessible(true);
         idField.set(refPE, "X");
 
-        byte[] refBytes = app.getSerDeser().serialize(refPE);
+        byte[] refBytes = app.getSerDeser().serialize(refPE).array();
 
         Assert.assertTrue(Arrays.equals(refBytes, Files.toByteArray(expected)));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index c3d30b7..9520831 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -25,8 +25,7 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.Assert;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
@@ -117,9 +116,12 @@ public class FTWordCountTest extends ZkBasedTest {
     private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) {
         Event event;
         event = new Event();
+        event.setStreamId("inputStream");
         event.put("sentence", String.class, sentence);
-        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
-                .serialize(event)));
+        emitter.send(
+                0,
+                injector.getInstance(SerializerDeserializerFactory.class)
+                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
     }
 
     private void restartNode() throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index f2b0297..a9734cf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -27,8 +27,7 @@ import junit.framework.Assert;
 
 import org.I0Itec.zkclient.IZkChildListener;
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.fixtures.CoreTestUtils;
@@ -83,9 +82,12 @@ public class RecoveryTest extends ZkBasedTest {
     private void insertCheckpointInstruction(Injector injector, TCPEmitter emitter) {
         Event event;
         event = new Event();
+        event.setStreamId("inputStream");
         event.put("command", String.class, "checkpoint");
-        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
-                .serialize(event)));
+        emitter.send(
+                0,
+                injector.getInstance(SerializerDeserializerFactory.class)
+                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
     }
 
     private void testCheckpointingConfiguration(Class<?> appClass, Class<?> backendModuleClass,
@@ -117,8 +119,11 @@ public class RecoveryTest extends ZkBasedTest {
         Event event = new Event();
         event.put("command", String.class, "setValue1");
         event.put("value", String.class, "message1");
-        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
-                .serialize(event)));
+        event.setStreamId("inputStream");
+        emitter.send(
+                0,
+                injector.getInstance(SerializerDeserializerFactory.class)
+                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
 
         if (manualCheckpointing) {
             insertCheckpointInstruction(injector, emitter);
@@ -140,10 +145,13 @@ public class RecoveryTest extends ZkBasedTest {
         CoreTestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
 
         event = new Event();
+        event.setStreamId("inputStream");
         event.put("command", String.class, "setValue2");
         event.put("value", String.class, "message2");
-        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
-                .serialize(event)));
+        emitter.send(
+                0,
+                injector.getInstance(SerializerDeserializerFactory.class)
+                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
 
         Assert.assertTrue(signalValue2Set.await(10, TimeUnit.SECONDS));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
index 903f288..a4b6827 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
@@ -58,7 +57,9 @@ public class MultithreadingTest {
         app.init();
         app.start();
 
-        app.testStream.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(new Event())));
+        Event event = new Event();
+        event.setStreamId(STREAM_NAME);
+        app.testStream.receiveEvent(event);
 
         /*
          * This must raise a timeout, since the onTime() event is blocked waiting for the onEvent() call to finish. If

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
index 78d814a..5d06b28 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
@@ -65,8 +64,9 @@ public class WindowingPETest {
 
         for (int i = 0; i < NB_EVENTS; i++) {
             Event e = new Event();
-            e.put("value", Integer.class, i);
-            app.stream1.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(e)));
+            e.setStreamId(STREAM_NAME);
+            e.put("value", Integer.class, (Integer) i);
+            app.stream1.receiveEvent(e);
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fd20746/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1fbd37a..2d8070c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -34,8 +34,7 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.s4.base.Event;
-import org.apache.s4.base.EventMessage;
-import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -131,9 +130,12 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
 
         Event event = new Event();
+        event.setStreamId("inputStream");
         event.put("line", String.class, time1);
-        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
-                .serialize(event)));
+        emitter.send(
+                0,
+                injector.getInstance(SerializerDeserializerFactory.class)
+                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));


Mime
View raw message