incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [7/15] Performance improvements and configurability - clearly identify asynchronous stages and use configurable and injectable executors for each of them (deserialization, processing, serialization) - default executors for processing/sending use throttli
Date Fri, 18 Jan 2013 12:16:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
deleted file mode 100644
index e744d39..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.util;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * Test util for communication protocols.
- * 
- * <ul>
- * <li>The util defines Send and Receive Threads</li>
- * <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
- * <li>ReceiveThread receives all/most of these messages</li>
- * <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would interrupt after a pre-defined but
- * long enough interval</li>
- * </ul>
- * 
- */
-public class PartitionInfo {
-    private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
-    public Emitter emitter;
-    public Listener listener;
-    public SendThread sendThread;
-    public ReceiveThread receiveThread;
-
-    private final int numRetries = 10;
-    private final int retryDelayMs = 10;
-    private int numMessages = 100;
-    private int partitionId;
-    private ProtocolTestUtil ptu;
-
-    SerializerDeserializer serDeser;
-
-    @Inject
-    public PartitionInfo(Emitter emitter, Listener listener, SerializerDeserializerFactory serDeserFactory) {
-        this.emitter = emitter;
-        this.listener = listener;
-        this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
-        this.partitionId = this.listener.getPartitionId();
-        logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
-                this.listener.getPartitionId());
-
-        // this.messagesExpected = numMessages * this.emitter.getPartitionCount();
-
-        this.sendThread = new SendThread();
-        this.receiveThread = new ReceiveThread();
-    }
-
-    public void setProtocolTestUtil(ProtocolTestUtil ptu) {
-        this.ptu = ptu;
-        this.ptu.expectedMessages[partitionId] = numMessages * this.emitter.getPartitionCount();
-    }
-
-    public class SendThread extends Thread {
-        public int[] sendCounts = new int[emitter.getPartitionCount()];
-
-        public SendThread() {
-            super("SendThread-" + partitionId);
-        }
-
-        @Override
-        public void run() {
-            try {
-                for (int i = 0; i < numMessages; i++) {
-                    for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-                        for (int retries = 0; retries < numRetries; retries++) {
-                            if (emitter.send(partition, ByteBuffer.wrap(new String(partitionId + " " + i).getBytes()))) {
-                                sendCounts[partition]++;
-                                break;
-                            }
-                            Thread.sleep(retryDelayMs);
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                return;
-            }
-
-            for (int i = 0; i < sendCounts.length; i++) {
-                if (sendCounts[i] < numMessages) {
-                    ptu.decreaseExpectedMessages(i, (numMessages - sendCounts[i]));
-                }
-            }
-
-            logger.debug("Exiting");
-        }
-    }
-
-    public class ReceiveThread extends Thread {
-        protected int messagesReceived = 0;
-        private Hashtable<Integer, List<Integer>> receivedMessages;
-
-        ReceiveThread() {
-            super("ReceiveThread-" + partitionId);
-            receivedMessages = new Hashtable<Integer, List<Integer>>();
-        }
-
-        @Override
-        public void run() {
-            while (messagesReceived < ptu.expectedMessages[partitionId]) {
-                ByteBuffer message = listener.recv();
-                if (message == null) {
-                    logger.error("ReceiveThread {}: received a null message", partitionId);
-                    break;
-                }
-
-                // process and store the message
-                String msgString = new String(message.array());
-                String[] msgTokens = msgString.split(" ");
-                Integer senderPartition = Integer.parseInt(msgTokens[0]);
-                Integer receivedMsg = Integer.parseInt(msgTokens[1]);
-
-                if (!receivedMessages.containsKey(senderPartition)) {
-                    receivedMessages.put(senderPartition, new ArrayList<Integer>());
-                }
-
-                List<Integer> messagesList = receivedMessages.get(senderPartition);
-
-                if (messagesList.contains(receivedMsg)) {
-                    messagesList.remove(receivedMsg);
-                } else {
-                    messagesReceived++;
-                }
-                messagesList.add(receivedMsg);
-            }
-
-            logger.debug("Exiting");
-        }
-
-        public boolean orderedDelivery() {
-            for (List<Integer> messagesList : receivedMessages.values()) {
-                int lastMsg = -1;
-                for (Integer msg : messagesList) {
-                    if (msg <= lastMsg) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        }
-
-        public boolean messageDelivery() {
-            if (messagesReceived < ptu.expectedMessages[partitionId]) {
-                printCounts();
-                return false;
-            } else
-                return true;
-        }
-
-        public void printCounts() {
-            logger.debug("ReceiveThread {}: Messages not received = {}", partitionId,
-                    (ptu.expectedMessages[partitionId] - messagesReceived));
-            int counts[] = new int[emitter.getPartitionCount()];
-            for (Integer sender : receivedMessages.keySet()) {
-                counts[sender] = receivedMessages.get(sender).size();
-            }
-
-            logger.debug("ReceiveThread {}: recvdCounts: {}", partitionId, counts);
-        }
-
-        public int moreMessages() {
-            return (int) (ptu.expectedMessages[partitionId] - messagesReceived);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
deleted file mode 100644
index c647611..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.util;
-
-import java.io.IOException;
-
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Injector;
-
-public abstract class ProtocolTestUtil extends ZkBasedTest {
-    protected int[] expectedMessages;
-    protected PartitionInfo[] partitions;
-
-    protected ProtocolTestUtil() {
-        super();
-    }
-
-    protected ProtocolTestUtil(int numTasks) {
-        super(numTasks);
-    }
-
-    @Before
-    public void preparePartitions() throws IOException, InterruptedException, KeeperException {
-        expectedMessages = new int[super.numTasks];
-        partitions = new PartitionInfo[super.numTasks];
-        for (int i = 0; i < this.numTasks; i++) {
-            partitions[i] = newInjector().getInstance(PartitionInfo.class);
-            partitions[i].setProtocolTestUtil(this);
-        }
-    }
-
-    protected abstract Injector newInjector() throws IOException;
-
-    protected void decreaseExpectedMessages(int partition, long amount) {
-        synchronized (expectedMessages) {
-            expectedMessages[partition] -= amount;
-        }
-
-        if (partitions[partition].receiveThread.messagesReceived >= expectedMessages[partition])
-            interrupt(partition);
-    }
-
-    protected void interrupt(int partition) {
-        partitions[partition].receiveThread.interrupt();
-    }
-
-    protected void startThreads() {
-        for (PartitionInfo partition : partitions) {
-            partition.sendThread.start();
-            partition.receiveThread.start();
-        }
-    }
-
-    protected void waitForThreads() throws InterruptedException {
-        for (PartitionInfo partition : partitions) {
-            partition.sendThread.join();
-            partition.receiveThread.join();
-        }
-    }
-
-    protected boolean messageDelivery() {
-        for (PartitionInfo partition : partitions) {
-            if (!partition.receiveThread.messageDelivery())
-                return false;
-        }
-        return true;
-    }
-
-    protected boolean messageOrdering() {
-        for (PartitionInfo partition : partitions) {
-            if (!partition.receiveThread.orderedDelivery())
-                return false;
-        }
-        return true;
-    }
-
-    @After
-    public void tearDown() {
-        for (PartitionInfo partition : partitions) {
-            // debug
-            partition.receiveThread.printCounts();
-            if (partition.emitter != null) {
-                partition.emitter.close();
-                partition.emitter = null;
-            }
-            if (partition.listener != null) {
-                partition.listener.close();
-                partition.listener = null;
-            }
-        }
-    }
-
-    @Test(timeout = 60000)
-    public abstract void testDelivery() throws InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 7316498..5d3967f 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -63,6 +63,9 @@ public class CommTestUtils {
     static {
         logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
     }
+    public final static String MESSAGE = "message@" + System.currentTimeMillis();
+
+    public final static CountDownLatch SIGNAL_MESSAGE_RECEIVED = new CountDownLatch(1);
 
     protected static Process forkProcess(String mainClass, int debugPort, String... args) throws IOException,
             InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
new file mode 100644
index 0000000..5dde4c2
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
@@ -0,0 +1,38 @@
+package org.apache.s4.fixtures;
+
+import java.nio.ByteBuffer;
+
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.google.inject.Inject;
+
+/**
+ * For tests purposes, intercepts messages that would normally be delegated to the application layer.
+ * 
+ */
+public class MockReceiver implements Receiver {
+
+    SerializerDeserializer serDeser;
+
+    @Inject
+    public MockReceiver(SerializerDeserializer serDeser) {
+        super();
+        this.serDeser = serDeser;
+    }
+
+    @Override
+    public void receive(ByteBuffer message) {
+        if (CommTestUtils.MESSAGE.equals(serDeser.deserialize(message))) {
+            CommTestUtils.SIGNAL_MESSAGE_RECEIVED.countDown();
+        } else {
+            System.err.println("Unexpected message:" + serDeser.deserialize(message));
+        }
+
+    }
+
+    @Override
+    public int getPartitionId() {
+        throw new RuntimeException("Not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
new file mode 100644
index 0000000..2a5d0a8
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
@@ -0,0 +1,27 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+
+/**
+ * For tests purposes, intercepts messages that would normally be delegated to the application layer.
+ * 
+ */
+public class MockReceiverModule extends AbstractModule {
+
+    @Provides
+    public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializerFactory serDeserFactory) {
+        // we use the current classloader here, no app class to serialize
+        return serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+    }
+
+    @Override
+    protected void configure() {
+        bind(Receiver.class).to(MockReceiver.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
new file mode 100644
index 0000000..01acdd1
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
@@ -0,0 +1,22 @@
+package org.apache.s4.fixtures;
+
+import java.nio.ByteBuffer;
+
+import org.apache.s4.base.Receiver;
+
+/**
+ * Avoids delegating message processing to the application layer.
+ * 
+ */
+class NoOpReceiver implements Receiver {
+
+    @Override
+    public void receive(ByteBuffer message) {
+        // do nothing
+    }
+
+    @Override
+    public int getPartitionId() {
+        throw new RuntimeException("Not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
new file mode 100644
index 0000000..2ee3749
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
@@ -0,0 +1,27 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+
+/**
+ * Avoids delegating message processing to the application layer.
+ * 
+ */
+public class NoOpReceiverModule extends AbstractModule {
+
+    @Provides
+    public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializerFactory serDeserFactory) {
+        // we use the current classloader here, no app class to serialize
+        return serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+    }
+
+    @Override
+    protected void configure() {
+        bind(Receiver.class).to(NoOpReceiver.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
index 7b525bd..f9707ec 100644
--- a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
+++ b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
@@ -5,3 +5,10 @@ s4.cluster.name = cluster1
 s4.cluster.zk_address = localhost:2181
 s4.cluster.zk_session_timeout = 10000
 s4.cluster.zk_connection_timeout = 10000
+
+# how many threads to use for the sender stage (i.e. serialization)
+s4.sender.parallelism=1
+# maximum number of events in the buffer of the sender stage
+s4.sender.workQueueSize=10000
+# maximum number of events in the buffer of the processing stage
+s4.stream.workQueueSize=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 7675e15..4a3693b 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -27,6 +27,7 @@ dependencies {
     compile libraries.asm
     compile libraries.netty
     compile libraries.zkclient
+    compile libraries.reflectasm
     testCompile project(path: ':s4-comm', configuration: 'tests')
     testCompile libraries.gradle_base_services
     testCompile libraries.gradle_core

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 43d137f..eb2ae1d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,10 +26,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.KeyFinder;
+import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
 import org.apache.s4.core.window.SlotFactory;
 import org.slf4j.Logger;
@@ -65,7 +67,7 @@ public abstract class App {
     @Inject
     private Sender sender;
     @Inject
-    private Receiver receiver;
+    private ReceiverImpl receiver;
 
     @Inject
     RemoteSenders remoteSenders;
@@ -90,6 +92,9 @@ public abstract class App {
     private SerializerDeserializer serDeser;
 
     @Inject
+    StreamExecutorServiceFactory streamExecutorFactory;
+
+    @Inject
     private void initSerDeser() {
         this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
     }
@@ -272,7 +277,7 @@ public abstract class App {
     /**
      * @return the receiver object
      */
-    public Receiver getReceiver() {
+    public ReceiverImpl getReceiver() {
         return receiver;
     }
 
@@ -284,6 +289,10 @@ public abstract class App {
         return checkpointingFramework;
     }
 
+    public StreamExecutorServiceFactory getStreamExecutorFactory() {
+        return streamExecutorFactory;
+    }
+
     /**
      * Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
      * prototypes by key.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 8874b93..5de792e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -25,10 +25,18 @@ import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.Sender;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.staging.DefaultRemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
+import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.core.util.S4Metrics;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
@@ -72,6 +80,9 @@ public class DefaultCoreModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
+        bind(Receiver.class).to(ReceiverImpl.class);
+        bind(Sender.class).to(SenderImpl.class);
+
         bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
 
         bind(S4RLoaderFactory.class);
@@ -81,6 +92,12 @@ public class DefaultCoreModule extends AbstractModule {
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
 
         bind(S4Metrics.class);
+
+        bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+        bind(RemoteSendersExecutorServiceFactory.class).to(DefaultRemoteSendersExecutorServiceFactory.class);
+
+        bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
+
     }
 
     private void loadProperties(Binder binder) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index b1a0c10..295e416 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -55,7 +55,6 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
 
 /**
  * <p>
@@ -441,7 +440,7 @@ public abstract class ProcessingElement implements Cloneable {
 
     protected void handleInputEvent(Event event) {
 
-        TimerContext timerContext = processingTimer.time();
+        // TimerContext timerContext = processingTimer.time();
         Object object;
         if (isThreadSafe) {
             object = new Object(); // a dummy object TODO improve this.
@@ -470,7 +469,7 @@ public abstract class ProcessingElement implements Cloneable {
                 checkpoint();
             }
         }
-        timerContext.stop();
+        // timerContext.stop();
     }
 
     protected boolean isCheckpointable() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
deleted file mode 100644
index 7a9b5f2..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.core.util.S4Metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.MapMaker;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-/**
- * The {@link Receiver} and its counterpart {@link Sender} are the top level classes of the communication layer.
- * <p>
- * {@link Receiver} is responsible for receiving an event to a {@link ProcessingElement} instance using a hashKey.
- * <p>
- * A Listener implementation receives data from the network and passes an event as a byte array to the {@link Receiver}.
- * The byte array is de-serialized and converted into an {@link Event}. Finally the event is passed to the matching
- * streams.
- * </p>
- * There is a single {@link Receiver} instance per node.
- * 
- * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer. </p>
- */
-@Singleton
-public class Receiver implements Runnable {
-
-    private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
-
-    final private Listener listener;
-    final private SerializerDeserializer serDeser;
-    private Map<Integer, Map<String, Stream<? extends Event>>> streams;
-    private Thread thread;
-
-    @Inject
-    public Receiver(Listener listener, SerializerDeserializer serDeser) {
-        this.listener = listener;
-        this.serDeser = serDeser;
-
-        thread = new Thread(this, "Receiver");
-        // TODO avoid starting the thread here
-        thread.start();
-
-        streams = new MapMaker().makeMap();
-    }
-
-    public int getPartition() {
-        return listener.getPartitionId();
-    }
-
-    /** Save stream keyed by app id and stream id. */
-    void addStream(Stream<? extends Event> stream) {
-        int appId = stream.getApp().getId();
-        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
-        if (appMap == null) {
-            appMap = new MapMaker().makeMap();
-            streams.put(appId, appMap);
-        }
-        appMap.put(stream.getName(), stream);
-    }
-
-    /** Remove stream when it is no longer needed. */
-    void removeStream(Stream<? extends Event> stream) {
-        int appId = stream.getApp().getId();
-        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
-        if (appMap == null) {
-            logger.error("Tried to remove a stream that is not registered in the receiver.");
-            return;
-        }
-        appMap.remove(stream.getName());
-    }
-
-    public void run() {
-        // TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener
-        // here?
-        while (!Thread.interrupted()) {
-            ByteBuffer message = listener.recv();
-            S4Metrics.receivedEvent(message.array().length);
-            Event event = (Event) serDeser.deserialize(message);
-
-            String streamId = event.getStreamName();
-
-            /*
-             * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:
-             * make this more efficient for the case in which we send the same event to multiple PEs.
-             */
-            try {
-                streams.get(-1).get(streamId).receiveEvent(event);
-            } catch (NullPointerException e) {
-                logger.error("Could not find target stream for event with streamId={}", streamId);
-            }
-        }
-    }
-
-    public void close() {
-        thread.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
new file mode 100644
index 0000000..476f4ac
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.core.util.S4Metrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.MapMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link ReceiverImpl} and its counterpart {@link SenderImpl} are the top level classes of the communication layer.
+ * <p>
+ * {@link ReceiverImpl} is responsible for receiving an event to a {@link ProcessingElement} instance using a hashKey.
+ * <p>
+ * A Listener implementation receives data from the network and passes an event as a byte array to the
+ * {@link ReceiverImpl}. The byte array is de-serialized and converted into an {@link Event}. Finally the event is
+ * passed to the matching streams.
+ * </p>
+ * There is a single {@link ReceiverImpl} instance per node.
+ * 
+ * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
+ * from the application developer. </p>
+ */
+@Singleton
+public class ReceiverImpl implements Receiver {
+
+    private static final Logger logger = LoggerFactory.getLogger(ReceiverImpl.class);
+
+    final private Listener listener;
+    final private SerializerDeserializer serDeser;
+    private Map<Integer, Map<String, Stream<? extends Event>>> streams;
+
+    @Inject
+    public ReceiverImpl(Listener listener, SerializerDeserializerFactory serDeserFactory) {
+        this.listener = listener;
+        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+
+        streams = new MapMaker().makeMap();
+    }
+
+    @Override
+    public int getPartitionId() {
+        return listener.getPartitionId();
+    }
+
+    /** Save stream keyed by app id and stream id. */
+    void addStream(Stream<? extends Event> stream) {
+        int appId = stream.getApp().getId();
+        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
+        if (appMap == null) {
+            appMap = new MapMaker().makeMap();
+            streams.put(appId, appMap);
+        }
+        appMap.put(stream.getName(), stream);
+    }
+
+    /** Remove stream when it is no longer needed. */
+    void removeStream(Stream<? extends Event> stream) {
+        int appId = stream.getApp().getId();
+        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
+        if (appMap == null) {
+            logger.error("Tried to remove a stream that is not registered in the receiver.");
+            return;
+        }
+        appMap.remove(stream.getName());
+    }
+
+    @Override
+    public void receive(ByteBuffer message) {
+        S4Metrics.receivedEventFromCommLayer(message.array().length);
+        Event event = (Event) serDeser.deserialize(message);
+
+        String streamId = event.getStreamName();
+
+        /*
+         * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO: make
+         * this more efficient for the case in which we send the same event to multiple PEs.
+         */
+        try {
+            Map<String, Stream<? extends Event>> map = streams.get(-1);
+            map.get(streamId).receiveEvent(event);
+        } catch (NullPointerException e) {
+            logger.error("Could not find target stream for event with streamId={}", streamId);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 9c9e09c..b6a7d5b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -21,6 +21,7 @@ package org.apache.s4.core;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
@@ -30,6 +31,7 @@ import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,25 +46,29 @@ public class RemoteSenders {
 
     Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
 
-    RemoteEmitters remoteEmitters;
+    final RemoteEmitters remoteEmitters;
 
-    RemoteStreams remoteStreams;
+    final RemoteStreams remoteStreams;
 
-    Clusters remoteClusters;
+    final Clusters remoteClusters;
 
-    SerializerDeserializer serDeser;
+    final SerializerDeserializer serDeser;
 
-    Hasher hasher;
+    final Hasher hasher;
 
     ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
 
+    private ExecutorService executorService;
+
     @Inject
     public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
-            SerializerDeserializerFactory serDeserFactory, Hasher hasher) {
+            SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+            RemoteSendersExecutorServiceFactory senderExecutorFactory) {
         this.remoteEmitters = remoteEmitters;
         this.remoteStreams = remoteStreams;
         this.remoteClusters = remoteClusters;
         this.hasher = hasher;
+        executorService = senderExecutorFactory.create();
 
         serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
     }
@@ -84,8 +90,26 @@ public class RemoteSenders {
                     sender = newSender;
                 }
             }
-            // we must set the app id of the consumer app for correct dispatch within the consumer node
             // NOTE: this implies multiple serializations, there might be an optimization
+            executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
+        }
+    }
+
+    class SendToRemoteClusterTask implements Runnable {
+
+        String hashKey;
+        Event event;
+        RemoteSender sender;
+
+        public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender) {
+            super();
+            this.hashKey = hashKey;
+            this.event = event;
+            this.sender = sender;
+        }
+
+        @Override
+        public void run() {
             sender.send(hashKey, serDeser.serialize(event));
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
deleted file mode 100644
index a308a15..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.nio.ByteBuffer;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Event;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.core.util.S4Metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * The {@link Sender} and its counterpart {@link Receiver} are the top level classes of the communication layer.
- * <p>
- * {@link Sender} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
- * <p>
- * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer.
- */
-public class Sender {
-
-    private static Logger logger = LoggerFactory.getLogger(Sender.class);
-
-    final private Emitter emitter;
-    final private SerializerDeserializer serDeser;
-    final private Hasher hasher;
-
-    Assignment assignment;
-    private int localPartitionId = -1;
-
-    /**
-     * 
-     * @param emitter
-     *            the emitter implements the low level communication layer.
-     * @param serDeser
-     *            a serialization mechanism.
-     * @param hasher
-     *            a hashing function to map keys to partition IDs.
-     */
-    @Inject
-    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
-        this.emitter = emitter;
-        this.serDeser = serDeser;
-        this.hasher = hasher;
-        this.assignment = assignment;
-    }
-
-    @Inject
-    private void resolveLocalPartitionId() {
-        ClusterNode node = assignment.assignClusterNode();
-        if (node != null) {
-            localPartitionId = node.getPartition();
-        }
-    }
-
-    /**
-     * This method attempts to send an event to a remote partition. If the destination is local, the method does not
-     * send the event and returns false. <b>The caller is then expected to put the event in a local queue instead.</b>
-     * 
-     * @param hashKey
-     *            the string used to map the value of a key to a specific partition.
-     * @param event
-     *            the event to be delivered to a {@link ProcessingElement} instance.
-     * @return true if the event was sent because the destination is <b>not</b> local.
-     * 
-     */
-    public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
-        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
-        if (partition == localPartitionId) {
-            /* Hey we are in the same JVM, don't use the network. */
-            return false;
-        }
-        send(partition, serDeser.serialize(event));
-        S4Metrics.sentEvent(partition);
-        return true;
-    }
-
-    private void send(int partition, ByteBuffer message) {
-
-        emitter.send(partition, message);
-    }
-
-    /**
-     * Send an event to all the remote partitions in the cluster. The caller is expected to also put the event in a
-     * local queue.
-     * 
-     * @param event
-     *            the event to be delivered to {@link ProcessingElement} instances.
-     */
-    public void sendToRemotePartitions(Event event) {
-
-        for (int i = 0; i < emitter.getPartitionCount(); i++) {
-
-            /* Don't use the comm layer when we send to the same partition. */
-            if (localPartitionId != i) {
-                emitter.send(i, serDeser.serialize(event));
-                S4Metrics.sentEvent(i);
-
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
new file mode 100644
index 0000000..255cdeb
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Sender;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.util.S4Metrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+/**
+ * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
+ * <p>
+ * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
+ * <p>
+ * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
+ * from the application developer.
+ */
+public class SenderImpl implements Sender {
+
+    private static Logger logger = LoggerFactory.getLogger(SenderImpl.class);
+
+    final private Emitter emitter;
+    final private SerializerDeserializer serDeser;
+    final private Hasher hasher;
+
+    Assignment assignment;
+    private int localPartitionId = -1;
+
+    private ExecutorService tpe;
+
+    /**
+     * 
+     * @param emitter
+     *            the emitter implements the low level communication layer.
+     * @param serDeser
+     *            a serialization mechanism.
+     * @param hasher
+     *            a hashing function to map keys to partition IDs.
+     */
+    @Inject
+    public SenderImpl(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+            Assignment assignment, SenderExecutorServiceFactory senderExecutorServiceFactory) {
+        this.emitter = emitter;
+        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+        this.hasher = hasher;
+        this.assignment = assignment;
+        this.tpe = senderExecutorServiceFactory.create();
+    }
+
+    @Inject
+    private void resolveLocalPartitionId() {
+        ClusterNode node = assignment.assignClusterNode();
+        if (node != null) {
+            localPartitionId = node.getPartition();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
+     */
+    @Override
+    public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
+        int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+        if (partition == localPartitionId) {
+            /* Hey we are in the same JVM, don't use the network. */
+            return false;
+        }
+        // TODO asynch
+        send(partition, serDeser.serialize(event));
+        S4Metrics.sentEvent(partition);
+        return true;
+    }
+
+    private void send(int partition, ByteBuffer message) {
+
+        emitter.send(partition, message);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event)
+     */
+    @Override
+    public void sendToAllRemotePartitions(Event event) {
+        tpe.submit(new SerializeAndSendToAllRemotePartitionsTask(event));
+
+    }
+
+    class SerializeAndSendToRemotePartitionTask implements Runnable {
+        Event event;
+        int remotePartitionId;
+
+        public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
+            this.event = event;
+            this.remotePartitionId = remotePartitionId;
+        }
+
+        @Override
+        public void run() {
+            ByteBuffer serializedEvent = serDeser.serialize(event);
+            emitter.send(remotePartitionId, serializedEvent);
+
+        }
+
+    }
+
+    class SerializeAndSendToAllRemotePartitionsTask implements Runnable {
+
+        Event event;
+
+        public SerializeAndSendToAllRemotePartitionsTask(Event event) {
+            super();
+            this.event = event;
+        }
+
+        @Override
+        public void run() {
+            ByteBuffer serializedEvent = serDeser.serialize(event);
+
+            for (int i = 0; i < emitter.getPartitionCount(); i++) {
+
+                /* Don't use the comm layer when we send to the same partition. */
+                if (localPartitionId != i) {
+                    emitter.send(i, serializedEvent);
+                    S4Metrics.sentEvent(i);
+
+                }
+            }
+
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 53ddc7a..94b19b5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -19,13 +19,14 @@
 package org.apache.s4.core;
 
 import java.util.Collection;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.base.GenericKeyFinder;
 import org.apache.s4.base.Key;
 import org.apache.s4.base.KeyFinder;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.core.util.S4Metrics;
@@ -42,25 +43,29 @@ import com.google.common.base.Preconditions;
  * <p>
  * To build an application, create stream objects using relevant methods in the {@link App} class.
  */
-public class Stream<T extends Event> implements Runnable, Streamable {
+public class Stream<T extends Event> implements Streamable {
 
     private static final Logger logger = LoggerFactory.getLogger(Stream.class);
 
     final static private String DEFAULT_SEPARATOR = "^";
-    final static private int CAPACITY = 1000;
+    final static private int CAPACITY = 100000;
     private static int idCounter = 0;
     private String name;
     protected Key<T> key;
     private ProcessingElement[] targetPEs;
-    protected final BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
-    private Thread thread;
+    // protected final BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
+    // final BlockingQueue<StreamEventProcessingTask> taskQueue = new ArrayBlockingQueue<StreamEventProcessingTask>(
+    // CAPACITY);
+    private Executor eventProcessingExecutor;
     final private Sender sender;
-    final private Receiver receiver;
+    final private ReceiverImpl receiver;
     // final private int id;
     final private App app;
     private Class<T> eventType = null;
     SerializerDeserializer serDeser;
 
+    private int parallelism = 1;
+
     /**
      * Send events using a {@link KeyFinder}. The key finder extracts the value of the key which is used to determine
      * the target {@link org.apache.s4.comm.topology.ClusterNode} for an event.
@@ -84,10 +89,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
             }
         }
 
-        /* Start streaming. */
-        thread = new Thread(this, name);
-        thread.setContextClassLoader(getApp().getClass().getClassLoader());
-        thread.start();
+        eventProcessingExecutor = app.getStreamExecutorFactory().create(parallelism, name,
+                app.getClass().getClassLoader());
+
         this.receiver.addStream(this);
     }
 
@@ -100,6 +104,14 @@ public class Stream<T extends Event> implements Runnable, Streamable {
      */
     public Stream<T> setName(String name) {
         this.name = name;
+        // Metrics.newGauge(getClass(), "stream-size-" + name, new Gauge<Integer>() {
+        //
+        // @Override
+        // public Integer value() {
+        // return taskQueue.size();
+        // }
+        // });
+
         return this;
     }
 
@@ -174,61 +186,55 @@ public class Stream<T extends Event> implements Runnable, Streamable {
      */
     @SuppressWarnings("unchecked")
     public void put(Event event) {
-        try {
-            event.setStreamId(getName());
-            event.setAppId(app.getId());
+        event.setStreamId(getName());
+        event.setAppId(app.getId());
+
+        /*
+         * Events may be sent to local or remote partitions or both. The following code implements the logic.
+         */
+        if (key != null) {
 
             /*
-             * Events may be sent to local or remote partitions or both. The following code implements the logic.
+             * We send to a specific PE instance using the key but we don't know if the target partition is remote or
+             * local. We need to ask the sender.
              */
-            if (key != null) {
+            if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
 
                 /*
-                 * We send to a specific PE instance using the key but we don't know if the target partition is remote
-                 * or local. We need to ask the sender.
+                 * Sender checked and decided that the target is local so we simply put the event in the queue and we
+                 * save the trip over the network.
                  */
-                if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
-
-                    /*
-                     * Sender checked and decided that the target is local so we simply put the event in the queue and
-                     * we save the trip over the network.
-                     */
-                    // TODO no need to serialize for local queue
-                    queue.put(event);
-                }
+                eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
+            }
 
-            } else {
+        } else {
 
-                /*
-                 * We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every
-                 * node. The sender method takes care of the remote partitions an we take care of putting the event into
-                 * the queue.
-                 */
-                sender.sendToRemotePartitions(event);
+            /*
+             * We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every node.
+             * The sender method takes care of the remote partitions an we take care of putting the event into the
+             * queue.
+             */
+            sender.sendToAllRemotePartitions(event);
 
-                // TODO no need to serialize for local queue
-                queue.put(event);
-                // TODO abstraction around queue and add dropped counter
-                // TODO add counter for local events
+            // now send to local queue
+            eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
+            // TODO abstraction around queue and add dropped counter
+            // TODO add counter for local events
 
-            }
-        } catch (InterruptedException e) {
-            logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            Thread.currentThread().interrupt();
         }
     }
 
     /**
-     * The low level {@link Receiver} object call this method when a new {@link Event} is available.
+     * The low level {@link ReceiverImpl} object call this method when a new {@link Event} is available.
      */
     public void receiveEvent(Event event) {
-        try {
-            queue.put(event);
-            // TODO abstraction around queue and add dropped counter
-        } catch (InterruptedException e) {
-            logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            Thread.currentThread().interrupt();
-        }
+        // NOTE: ArrayBlockingQueue.size is O(1).
+        // if (taskQueue.remainingCapacity() == 0) {
+        // S4Metrics.queueIsFull(name);
+        // }
+
+        eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
+        // TODO abstraction around queue and add dropped counter
     }
 
     /**
@@ -263,7 +269,6 @@ public class Stream<T extends Event> implements Runnable, Streamable {
      * Stop and close this stream.
      */
     public void close() {
-        thread.interrupt();
     }
 
     /**
@@ -280,59 +285,82 @@ public class Stream<T extends Event> implements Runnable, Streamable {
         return receiver;
     }
 
-    @Override
-    public void run() {
-        while (true) {
-            try {
-                /* Get oldest event in queue. */
-                T event = (T) queue.take();
-                S4Metrics.dequeuedEvent(name);
+    public Stream<T> register() {
+        app.addStream(this);
+        return this;
+    }
+
+    public Stream<T> setSerializerDeserializerFactory(SerializerDeserializerFactory serDeserFactory) {
+        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+        return this;
+    }
+
+    /**
+     * <p>
+     * Defines the maximum number of concurrent threads that should be used for processing events for this stream.
+     * Threads will only be created as necessary, up to the specified maximum.
+     * </p>
+     * <p>
+     * Default is 1 (i.e. with default stream executor service, this corresponds to asynchronous processing, but no
+     * parallelism)
+     * </p>
+     * <p>
+     * It might be useful to increase parallelism when:
+     * <ul>
+     * <li>Processing elements handling events for this stream are CPU bound</li>
+     * <li>Processing elements handling events for this stream use blocking I/O operations</li>
+     * </ul>
+     * <p>
+     * 
+     * 
+     */
+    public Stream<T> setParallelism(int parallelism) {
+        this.parallelism = parallelism;
+        return this;
+    }
 
-                /* Send event to each target PE. */
-                for (int i = 0; i < targetPEs.length; i++) {
+    class StreamEventProcessingTask implements Runnable {
 
-                    if (key == null) {
+        T event;
 
-                        /* Broadcast to all PE instances! */
+        public StreamEventProcessingTask(T event) {
+            this.event = event;
+        }
 
-                        /* STEP 1: find all PE instances. */
+        @Override
+        public void run() {
+            S4Metrics.dequeuedEvent(name);
 
-                        Collection<ProcessingElement> pes = targetPEs[i].getInstances();
+            /* Send event to each target PE. */
+            for (int i = 0; i < targetPEs.length; i++) {
 
-                        /* STEP 2: iterate and pass event to PE instance. */
-                        for (ProcessingElement pe : pes) {
+                if (key == null) {
 
-                            pe.handleInputEvent(event);
-                        }
+                    /* Broadcast to all PE instances! */
 
-                    } else {
+                    /* STEP 1: find all PE instances. */
 
-                        /* We have a key, send to target PE. */
+                    Collection<ProcessingElement> pes = targetPEs[i].getInstances();
 
-                        /* STEP 1: find the PE instance for key. */
-                        ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
+                    /* STEP 2: iterate and pass event to PE instance. */
+                    for (ProcessingElement pe : pes) {
 
-                        /* STEP 2: pass event to PE instance. */
                         pe.handleInputEvent(event);
                     }
-                }
 
-            } catch (InterruptedException e) {
-                logger.info("Closing stream {}.", name);
-                receiver.removeStream(this);
-                Thread.currentThread().interrupt();
-                return;
+                } else {
+
+                    /* We have a key, send to target PE. */
+
+                    /* STEP 1: find the PE instance for key. */
+                    ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
+
+                    /* STEP 2: pass event to PE instance. */
+                    pe.handleInputEvent(event);
+                }
             }
-        }
-    }
 
-    public Stream<T> register() {
-        app.addStream(this);
-        return this;
-    }
+        }
 
-    public Stream<T> setSerializerDeserializerFactory(SerializerDeserializerFactory serDeserFactory) {
-        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
-        return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
index ca23c79..3e2d617 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -22,12 +22,13 @@ import com.google.inject.AbstractModule;
 
 /**
  * Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
- *
+ * 
  */
 public class FileSystemBackendCheckpointingModule extends AbstractModule {
     @Override
     protected void configure() {
         bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
         bind(CheckpointingFramework.class).to(SafeKeeper.class);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..b47087f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.staging;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Default implementation of the remote senders executor factory. It clones the implementation of the
+ * {@link DefaultSenderExecutorServiceFactory} class.
+ * 
+ */
+public class DefaultRemoteSendersExecutorServiceFactory extends DefaultSenderExecutorServiceFactory implements
+        RemoteSendersExecutorServiceFactory {
+
+    @Inject
+    public DefaultRemoteSendersExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+            @Named("s4.sender.workQueueSize") int workQueueSize) {
+        super(threadPoolSize, workQueueSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
new file mode 100644
index 0000000..344bd59
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
@@ -0,0 +1,34 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.ThrottlingThreadPoolExecutorService;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Default factory implementation for the sender executor service. It uses a mechanism for throttling the submission of
+ * events and maintaining partial order.
+ * 
+ */
+public class DefaultSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
+
+    private int threadPoolSize;
+    private int workQueueSize;
+
+    @Inject
+    public DefaultSenderExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+            @Named("s4.sender.workQueueSize") int workQueueSize) {
+        this.threadPoolSize = threadPoolSize;
+        this.workQueueSize = workQueueSize;
+    }
+
+    @Override
+    public ExecutorService create() {
+        return new ThrottlingThreadPoolExecutorService(threadPoolSize, true,
+                (this instanceof DefaultRemoteSendersExecutorServiceFactory) ? "remote-sender-%d" : "sender-%d",
+                workQueueSize, getClass().getClassLoader());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
new file mode 100644
index 0000000..e0b52a9
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
@@ -0,0 +1,37 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.ThrottlingThreadPoolExecutorService;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Default factory for the event processing stage executors.
+ * </p>
+ * <p>
+ * It provides optional parallelism, when the processing activity requires blocking I/O operations, or is CPU-bound.
+ * </p>
+ * <p>
+ * It throttles the submission of events while preserving partial ordering.
+ * </p>
+ * 
+ */
+public class DefaultStreamProcessingExecutorServiceFactory implements StreamExecutorServiceFactory {
+
+    private int workQueueSize;
+
+    @Inject
+    public DefaultStreamProcessingExecutorServiceFactory(@Named("s4.stream.workQueueSize") int workQueueSize) {
+        this.workQueueSize = workQueueSize;
+    }
+
+    @Override
+    public ExecutorService create(int parallelism, String name, final ClassLoader classLoader) {
+        return new ThrottlingThreadPoolExecutorService(parallelism, true, "stream-" + name + "-%d", workQueueSize,
+                classLoader);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..a3fd4dc
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,12 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Defines an executor factory for the stage responsible for sending events to remote logical clusters.
+ * 
+ */
+public interface RemoteSendersExecutorServiceFactory {
+
+    ExecutorService create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
new file mode 100644
index 0000000..d7393f3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Defines a factory that creates executors for the stage responsible for the serialization of events and delegation to
+ * emitters in the communication layer.
+ * 
+ */
+public interface SenderExecutorServiceFactory {
+
+    ExecutorService create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
new file mode 100644
index 0000000..db2df27
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
@@ -0,0 +1,29 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.core.App;
+
+/**
+ * Factory for creating an executor service that will process events in PEs. This is typically done asynchronously with
+ * a configurable thread pool.
+ * <p>
+ * Implementations may use dependency injection to set some default parameters.
+ * </p>
+ */
+public interface StreamExecutorServiceFactory {
+
+    /**
+     * Creates the executor service for a given stream.
+     * 
+     * @param parallelism
+     *            Number of concurrent threads
+     * @param name
+     *            Name of the stream (for naming threads)
+     * @param classLoader
+     *            Classloader used for specifying the context classloader in processing threads. This is usually the
+     *            classloader that loaded the {@link App} class.
+     * @return Executor service for processing events in PEs
+     */
+    ExecutorService create(int parallelism, String name, ClassLoader classLoader);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 46abecf..27ac387 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -7,9 +7,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Receiver;
+import org.apache.s4.core.ReceiverImpl;
 import org.apache.s4.core.RemoteSender;
-import org.apache.s4.core.Sender;
+import org.apache.s4.core.SenderImpl;
 import org.apache.s4.core.Stream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,15 +36,16 @@ public class S4Metrics {
 
     static List<Meter> partitionSenderMeters = Lists.newArrayList();
 
-    private static Meter eventMeter = Metrics.newMeter(Receiver.class, "received-events", "event-count",
+    private static Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
             TimeUnit.SECONDS);
-    private static Meter bytesMeter = Metrics.newMeter(Receiver.class, "received-bytes", "bytes-count",
+    private static Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
             TimeUnit.SECONDS);
 
     private static Meter[] senderMeters;
 
     private static Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
     private static Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
+    private static Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
 
     private static Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
 
@@ -53,7 +54,7 @@ public class S4Metrics {
         senderMeters = new Meter[emitter.getPartitionCount()];
         // int localPartitionId = assignment.assignClusterNode().getPartition();
         for (int i = 0; i < senderMeters.length; i++) {
-            senderMeters[i] = Metrics.newMeter(Sender.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
+            senderMeters[i] = Metrics.newMeter(SenderImpl.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
         }
     }
 
@@ -83,11 +84,16 @@ public class S4Metrics {
         });
     }
 
-    public static void receivedEvent(int bytes) {
+    public static void receivedEventFromCommLayer(int bytes) {
         eventMeter.mark();
         bytesMeter.mark(bytes);
     }
 
+    public static void queueIsFull(String name) {
+        streamQueueFullMeters.get(name).mark();
+
+    }
+
     public static void sentEvent(int partition) {
         try {
             senderMeters[partition].mark();
@@ -103,7 +109,8 @@ public class S4Metrics {
         dequeuingStreamMeters.put(name,
                 Metrics.newMeter(Stream.class, "dequeued@" + name, "dequeued", TimeUnit.SECONDS));
         droppedStreamMeters.put(name, Metrics.newMeter(Stream.class, "dropped@" + name, "dropped", TimeUnit.SECONDS));
-
+        streamQueueFullMeters.put(name,
+                Metrics.newMeter(Stream.class, "stream-full@" + name, "stream-full", TimeUnit.SECONDS));
     }
 
     public static void dequeuedEvent(String name) {
@@ -153,4 +160,5 @@ public class S4Metrics {
             fetchedCheckpointFailure.mark();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index e0bd874..377de0b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -166,6 +166,7 @@ public class CheckpointingTest {
             bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
             bind(CheckpointingFramework.class).to(SafeKeeper.class);
             bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
index b1676dc..6d358ce 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -29,7 +29,6 @@ import org.apache.s4.core.App;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
 import org.apache.s4.core.window.DefaultAggregatingSlot;
 import org.apache.s4.core.window.SlotFactory;
-import org.apache.s4.core.windowing.WindowingPETest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 71bae7a..02fd694 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,7 +20,12 @@ package org.apache.s4.fixtures;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
-import org.apache.s4.core.Receiver;
+import org.apache.s4.base.Sender;
+import org.apache.s4.core.ReceiverImpl;
+import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
+import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
 import org.mockito.Mockito;
@@ -28,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
 
 /**
  * Core module mocking basic platform functionalities.
@@ -46,6 +52,19 @@ public class MockCoreModule extends AbstractModule {
         bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
         bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
         bind(Listener.class).toInstance(Mockito.mock(Listener.class));
-        bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+        bind(ReceiverImpl.class).toInstance(Mockito.mock(ReceiverImpl.class));
+        bind(Sender.class).toInstance(Mockito.mock(Sender.class));
+
+        // Although we want to mock as much as possible, most tests still require the machinery for routing events
+        // within a stream/node, therefore sender and stream executors are not mocked
+        bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
+
+        bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+
+        bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
+        bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);
+
+        bind(Integer.class).annotatedWith(Names.named("s4.stream.workQueueSize")).toInstance(10000);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index 0209933..1ea3a2a 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -21,9 +21,10 @@ package org.apache.s4.example.counter;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
+import org.apache.s4.base.Sender;
 import org.apache.s4.core.App;
-import org.apache.s4.core.Receiver;
-import org.apache.s4.core.Sender;
+import org.apache.s4.core.ReceiverImpl;
+import org.apache.s4.core.SenderImpl;
 import org.apache.s4.core.Stream;
 
 import com.google.inject.Guice;
@@ -117,8 +118,8 @@ final public class MyApp extends App {
 
         Injector injector = Guice.createInjector(new Module());
         MyApp myApp = injector.getInstance(MyApp.class);
-        Sender sender = injector.getInstance(Sender.class);
-        Receiver receiver = injector.getInstance(Receiver.class);
+        Sender sender = injector.getInstance(SenderImpl.class);
+        ReceiverImpl receiver = injector.getInstance(ReceiverImpl.class);
         // myApp.setCommLayer(sender, receiver);
         myApp.init();
         myApp.start();


Mime
View raw message