incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [13/15] git commit: Address review comments + provide more sender executors - use a throttling sender by default
Date Fri, 18 Jan 2013 12:16:45 GMT
Address review comments + provide more sender executors
- use a throttling sender by default


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/49dfe9ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/49dfe9ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/49dfe9ba

Branch: refs/heads/dev
Commit: 49dfe9bafeb72a94ff59c2302b522f44641a54a1
Parents: 6fe7ea8
Author: Matthieu Morel <mmorel@apache.org>
Authored: Fri Jan 4 14:38:39 2013 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Fri Jan 4 15:05:26 2013 +0100

----------------------------------------------------------------------
 build.gradle                                       |    1 -
 .../src/main/java/org/apache/s4/base/Emitter.java  |    4 +-
 .../src/main/java/org/apache/s4/base/Event.java    |   11 +++-
 .../main/java/org/apache/s4/base/package-info.java |    2 +-
 subprojects/s4-benchmarks/README.md                |    2 -
 .../s4/benchmark/prodcon/ProducerConsumerApp.java  |    2 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |    4 +-
 .../s4/comm/DeserializerExecutorFactory.java       |    8 ++-
 .../BlockingDeserializerExecutorFactory.java       |   26 ++++++
 .../staging/BlockingThreadPoolExecutorService.java |    6 +-
 .../ThrottlingThreadPoolExecutorService.java       |    2 +-
 .../org/apache/s4/comm/tcp/RemoteEmitters.java     |    3 +
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   16 ++---
 .../src/main/resources/default.s4.comm.properties  |   18 ++++-
 .../java/org/apache/s4/comm/tcp/TCPBasicTest.java  |   43 +++++------
 .../java/org/apache/s4/comm/udp/UDPBasicTest.java  |   29 +++----
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |    4 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    9 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |    2 +-
 .../java/org/apache/s4/core/RemoteSenders.java     |    7 ++-
 .../main/java/org/apache/s4/core/SenderImpl.java   |   13 +++-
 .../src/main/java/org/apache/s4/core/Stream.java   |    3 -
 ...lockingRemoteSendersExecutorServiceFactory.java |    2 +
 .../BlockingSenderExecutorServiceFactory.java      |    2 +
 .../BlockingStreamExecutorServiceFactory.java      |    6 +-
 ...heddingRemoteSendersExecutorServiceFactory.java |   20 +++++
 .../LoadSheddingSenderExecutorServiceFactory.java  |   62 +++++++++++++++
 ...ottlingRemoteSendersExecutorServiceFactory.java |   10 ++-
 .../ThrottlingSenderExecutorServiceFactory.java    |    9 ++
 .../java/org/apache/s4/core/util/S4Metrics.java    |   14 +++-
 .../org/apache/s4/core/ft/FTWordCountTest.java     |    2 +-
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |    2 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |    2 +-
 33 files changed, 259 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index aa0bd0a..7d6287f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -147,7 +147,6 @@ subprojects {
         compile(libraries.reflectasm)
         runtime(libraries.objenesis)
         runtime(libraries.kryo)
-        runtime(libraries.reflectasm)
         runtime(libraries.netty)
         runtime(libraries.asm)
         compile(libraries.javax_inject)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index a6ede8f..1f74381 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -34,8 +34,10 @@ public interface Emitter {
      *            - message payload that needs to be sent
      * 
      * @return - true - if message is sent across successfully - false - if send fails
+     * @throws InterruptedException
+     *             if interrupted during blocking send operation
      */
-    boolean send(int partitionId, ByteBuffer message);
+    boolean send(int partitionId, ByteBuffer message) throws InterruptedException;
 
     int getPartitionCount();
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 8180180..f4186a8 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
+import com.google.common.primitives.Primitives;
 
 /**
  * The base event class in S4. The base class supports generic key/value pairs which us useful for rapid prototyping and
@@ -134,7 +135,15 @@ public class Event {
 
         Data<?> data = map.get(key);
 
-        return (T) data.value;
+        try {
+            return (T) data.value;
+        } catch (ClassCastException e) {
+            if (!Primitives.wrap(type).isAssignableFrom(Primitives.wrap(data.type))) {
+                logger.error("Trying to get a value of type {} for an attribute of type {}.", type, data.type);
+                return null;
+            }
+            throw e;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
index d068a66..f046e96 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
@@ -18,7 +18,7 @@
 
 /**
  * Defines some of the basic elements of the S4 platforms.
- * 
+ *
  * 
  * 
  */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
index 9847b84..1839df3 100644
--- a/subprojects/s4-benchmarks/README.md
+++ b/subprojects/s4-benchmarks/README.md
@@ -58,8 +58,6 @@ Exmample configuration files are available in `/config` and you can configure :
 
 The total number of events sent from an injector is `number of keys * number of test iterations * number of parallel injection threads`. Make sure this is significant in order to be able to correctly interpret the messaging rates (1000 would be too little for instance!).
 
-The total number of events sent from an injector is `number of keys * number of test iterations * number of parallel injection threads`. Make sure this is significant in order to be able to correctly interpret the messaging rates (1000 would be too little for instance!).
-
 By default in this example the size of a message is 188 bytes.
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
index 6465cf5..72eefa4 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
@@ -52,7 +52,7 @@ public class ProducerConsumerApp extends App {
             public List<String> get(Event event) {
                 return ImmutableList.of(event.get("key"));
             }
-        }, simplePE1).setParallelism(1);
+        }, simplePE1).setParallelism(3);
 
         SimplePE2 simplePE2 = createPE(SimplePE2.class, "simplePE2");
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index a0612cf..a295290 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -31,7 +31,7 @@ import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
+import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
@@ -102,7 +102,7 @@ public class DefaultCommModule extends AbstractModule {
 
         bind(RemoteEmitters.class);
 
-        bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
+        bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
 
         try {
             Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
index b1cee4d..40492fd 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
@@ -11,8 +11,14 @@ import java.util.concurrent.Executor;
  * There are many possible implementations, that may consider various factors, in particular:
  * <ul>
  * <li>parallelism
- * <li>memory usage
+ * <li>memory usage (directly measured, or inferred from the number of buffered events)
  * <li>sharing threadpool among channel workers
+ * </ul>
+ * <p>
+ * When related thresholds are reached, deserializer executors may:
+ * <ul>
+ * <li>block: this indirectly blocks the reception of messages for this channel, applying upstream backpressure.
+ * <li>drop messages: a form of load shedding
  * 
  * 
  */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingDeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingDeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingDeserializerExecutorFactory.java
new file mode 100644
index 0000000..da2297a
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingDeserializerExecutorFactory.java
@@ -0,0 +1,26 @@
+package org.apache.s4.comm.staging;
+
+import java.util.concurrent.Executor;
+
+import org.apache.s4.comm.DeserializerExecutorFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Executors factory for the deserialization stage that blocks incoming tasks when the work queue is full.
+ * 
+ */
+public class BlockingDeserializerExecutorFactory implements DeserializerExecutorFactory {
+
+    @Named("s4.listener.maxEventsPerDeserializer")
+    @Inject(optional = true)
+    protected int maxEventsPerDeserializer = 100000;
+
+    @Override
+    public Executor create() {
+        return new BlockingThreadPoolExecutorService(1, false, "deserializer-%d", maxEventsPerDeserializer, getClass()
+                .getClassLoader());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
index f3f6c9d..319d382 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
@@ -18,6 +18,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -101,8 +102,8 @@ public class BlockingThreadPoolExecutorService extends ForwardingListeningExecut
         try {
             queueingPermits.acquire();
         } catch (InterruptedException e) {
-            e.printStackTrace();
             Thread.currentThread().interrupt();
+            return Futures.immediateFailedCheckedFuture(e);
         }
         ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
         return future;
@@ -114,6 +115,7 @@ public class BlockingThreadPoolExecutorService extends ForwardingListeningExecut
             queueingPermits.acquire();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
+            return Futures.immediateFailedCheckedFuture(e);
         }
         ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
         return future;
@@ -125,6 +127,7 @@ public class BlockingThreadPoolExecutorService extends ForwardingListeningExecut
             queueingPermits.acquire();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
+            return Futures.immediateFailedCheckedFuture(e);
         }
         ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
         return future;
@@ -135,7 +138,6 @@ public class BlockingThreadPoolExecutorService extends ForwardingListeningExecut
         try {
             queueingPermits.acquire();
         } catch (InterruptedException e) {
-            e.printStackTrace();
             Thread.currentThread().interrupt();
         }
         super.execute(new RunnableWithPermitRelease(command));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
index 2b01969..0b10590 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
@@ -44,7 +44,7 @@ public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExec
      *            Maximum number of threads in the pool
      * @param fairParallelism
      *            If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
-     *            can be help ensure ordering, though there is an associated performance cost (typically small).
+     *            can help ensure ordering, though there is an associated performance cost (typically small).
      * @param threadName
      *            Naming scheme
      * @param workQueueSize

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index 63aa1f0..4b3040d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -47,6 +47,9 @@ public class RemoteEmitters {
             emitter = emitters.putIfAbsent(topology, newEmitter);
             if (emitter == null) {
                 emitter = newEmitter;
+            } else {
+                // use the existing emitter instead
+                newEmitter.close();
             }
         }
         return emitter;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 0bcce32..fd0ad2b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -159,7 +159,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     }
 
-    private boolean connectTo(Integer partitionId) {
+    private boolean connectTo(Integer partitionId) throws InterruptedException {
         ClusterNode clusterNode = partitionNodeMap.get(partitionId);
 
         if (clusterNode == null) {
@@ -181,12 +181,12 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         } catch (InterruptedException ie) {
             logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
                     clusterNode.getPort()));
-            Thread.currentThread().interrupt();
+            throw ie;
         }
         return false;
     }
 
-    private void sendMessage(int partitionId, ByteBuffer message) {
+    private void sendMessage(int partitionId, ByteBuffer message) throws InterruptedException {
         ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
 
         if (!partitionChannelMap.containsKey(partitionId)) {
@@ -197,12 +197,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             }
         }
 
-        try {
-            writePermits.get(partitionId).acquire();
-        } catch (InterruptedException e) {
-            logger.error("Interrupted while acquiring permit", e);
-            Thread.currentThread().interrupt();
-        }
+        writePermits.get(partitionId).acquire();
 
         Channel c = partitionChannelMap.get(partitionId);
         if (c == null) {
@@ -214,7 +209,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     }
 
     @Override
-    public boolean send(int partitionId, ByteBuffer message) {
+    public boolean send(int partitionId, ByteBuffer message) throws InterruptedException {
         // TODO a possible optimization would be to buffer messages per partition, with a small timeout. This will limit
         // the number of writes and therefore system calls.
         sendMessage(partitionId, message);
@@ -240,6 +235,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     @Override
     public void close() {
         try {
+            topology.removeListener(this);
             channels.close().await();
             bootstrap.releaseExternalResources();
         } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 7d1270b..fad7ec4 100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -8,12 +8,26 @@ s4.cluster.zk_address = localhost:2181
 s4.cluster.zk_session_timeout = 10000
 s4.cluster.zk_connection_timeout = 10000
 
+
+# NOTE: the following numbers should be tuned according to the application, use case, and infrastructure
+
 # how many threads to use for the sender stage (i.e. serialization)
 s4.sender.parallelism=1
 # maximum number of events in the buffer of the sender stage
-s4.sender.workQueueSize=100000
+s4.sender.workQueueSize=10000
+# maximum sending rate from a given node, in events / s (used with throttling sender executors)
+s4.sender.maxRate=200000
+
+# how many threads to use for the *remote* sender stage (i.e. serialization)
+s4.remoteSender.parallelism=1
+# maximum number of events in the buffer of the *remote* sender stage
+s4.remoteSender.workQueueSize=10000
+# maximum *remote* sending rate from a given node, in events / s (used with throttling *remote* sender executors)
+s4.remoteSender.maxRate=200000
+
 # maximum number of pending writes to a given comm channel
 s4.emitter.maxPendingWrites=1000
 
 # maximum number of events in the buffer of the processing stage
-s4.stream.workQueueSize=100000
\ No newline at end of file
+s4.stream.workQueueSize=10000
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
index f43d61f..a4bd8a2 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@ -22,29 +22,24 @@ import com.google.inject.Injector;
 public class TCPBasicTest extends ZkBasedTest {
 
     @Test
-    public void testSingleMessage() {
-
-        try {
-            Injector injector1 = Guice
-                    .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                            .openStream(), "cluster1"), new TCPTransportModule(), new NoOpReceiverModule());
-            Emitter emitter = injector1.getInstance(Emitter.class);
-
-            Injector injector2 = Guice
-                    .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                            .openStream(), "cluster1"), new TCPTransportModule(), new MockReceiverModule());
-            // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
-            // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
-            injector2.getInstance(Listener.class);
-
-            emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
-
-            // check receiver got the message
-            Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail(e.getMessage());
-        }
+    public void testSingleMessage() throws Exception {
+
+        Injector injector1 = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new TCPTransportModule(), new NoOpReceiverModule());
+        Emitter emitter = injector1.getInstance(Emitter.class);
+
+        Injector injector2 = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new TCPTransportModule(), new MockReceiverModule());
+        // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+        // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
+        injector2.getInstance(Listener.class);
+
+        emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+
+        // check receiver got the message
+        Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
index 42bdf91..47cb39c 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@ -22,28 +22,21 @@ import com.google.inject.Injector;
 public class UDPBasicTest extends ZkBasedTest {
 
     @Test
-    public void testSingleMessage() {
+    public void testSingleMessage() throws Exception {
 
-        try {
-            Injector injector1 = Guice.createInjector(
-                    new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
-                    new UDPTransportModule(), new NoOpReceiverModule());
-            Emitter emitter = injector1.getInstance(Emitter.class);
+        Injector injector1 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
+                .openStream(), "cluster1"), new UDPTransportModule(), new NoOpReceiverModule());
+        Emitter emitter = injector1.getInstance(Emitter.class);
 
-            Injector injector2 = Guice.createInjector(
-                    new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
-                    new UDPTransportModule(), new MockReceiverModule());
-            // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
-            // listener)
-            injector2.getInstance(Listener.class);
+        Injector injector2 = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
+                .openStream(), "cluster1"), new UDPTransportModule(), new MockReceiverModule());
+        // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+        // listener)
+        injector2.getInstance(Listener.class);
 
-            emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+        emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
 
-            Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+        Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail(e.getMessage());
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 89f4982..466bd66 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -21,6 +21,8 @@ package org.apache.s4.fixtures;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 
+import junit.framework.Assert;
+
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@ -51,7 +53,7 @@ public abstract class ZkBasedTest {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 logger.error("Uncaught error in thread {}: {}", t.getName(), e);
-
+                Assert.fail("Uncaught error in thread " + t.getName() + " : " + e.getMessage());
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index cf17f36..56d2f35 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -30,11 +30,11 @@ import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
-import org.apache.s4.core.staging.LoadSheddingStreamExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
 import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
+import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
@@ -85,10 +85,11 @@ public class DefaultCoreModule extends AbstractModule {
         // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
 
-        bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
+        // shed load in local sender only by default
+        bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
         bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
 
-        bind(StreamExecutorServiceFactory.class).to(LoadSheddingStreamExecutorServiceFactory.class);
+        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 088c7bf..dba8b6c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -43,7 +43,7 @@ public class RemoteSender {
 
     }
 
-    public void send(String hashKey, ByteBuffer message) {
+    public void send(String hashKey, ByteBuffer message) throws InterruptedException {
         int partition;
         if (hashKey == null) {
             // round robin by default

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 71a91e9..7c7238f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -110,7 +110,12 @@ public class RemoteSenders {
 
         @Override
         public void run() {
-            sender.send(hashKey, serDeser.serialize(event));
+            try {
+                sender.send(hashKey, serDeser.serialize(event));
+            } catch (InterruptedException e) {
+                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+                Thread.currentThread().interrupt();
+            }
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 03799f6..d2e731d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -131,7 +131,12 @@ public class SenderImpl implements Sender {
         @Override
         public void run() {
             ByteBuffer serializedEvent = serDeser.serialize(event);
-            emitter.send(remotePartitionId, serializedEvent);
+            try {
+                emitter.send(remotePartitionId, serializedEvent);
+            } catch (InterruptedException e) {
+                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+                Thread.currentThread().interrupt();
+            }
 
         }
 
@@ -154,7 +159,11 @@ public class SenderImpl implements Sender {
 
                 /* Don't use the comm layer when we send to the same partition. */
                 if (localPartitionId != i) {
-                    emitter.send(i, serializedEvent);
+                    try {
+                        emitter.send(i, serializedEvent);
+                    } catch (InterruptedException e) {
+                        logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+                    }
                     metrics.sentEvent(i);
 
                 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 1de77bb..00137b8 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -230,9 +230,6 @@ public class Stream<T extends Event> implements Streamable {
      */
     public void receiveEvent(Event event) {
         // NOTE: ArrayBlockingQueue.size is O(1).
-        // if (taskQueue.remainingCapacity() == 0) {
-        // S4Metrics.queueIsFull(name);
-        // }
 
         eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
         // TODO abstraction around queue and add dropped counter

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
index 5180062..df89d68 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
@@ -6,6 +6,8 @@ import com.google.inject.name.Named;
 /**
  * Blocking implementation of the remote senders executor factory. It clones the implementation of the
  * {@link BlockingSenderExecutorServiceFactory} class.
+ * <p>
+ * This is convenient for sending events through adapters.
  * 
  */
 public class BlockingRemoteSendersExecutorServiceFactory extends BlockingSenderExecutorServiceFactory implements

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
index aad3a19..3ab73a2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
@@ -10,6 +10,8 @@ import com.google.inject.name.Named;
 /**
  * Blocking factory implementation for the sender executor service. It uses a mechanism that blocks the submission of
  * events when the work queue is full.
+ * <p>
+ * Beware that this can lead to deadlocks if processing queues are full on all nodes.
  * 
  */
 public class BlockingSenderExecutorServiceFactory implements SenderExecutorServiceFactory {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
index 940a178..a77a35e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
@@ -7,6 +7,10 @@ import org.apache.s4.comm.staging.BlockingThreadPoolExecutorService;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
+/**
+ * Factory for stream executors that block when task queue is full.
+ * 
+ */
 public class BlockingStreamExecutorServiceFactory implements StreamExecutorServiceFactory {
 
     private final int workQueueSize;
@@ -18,7 +22,7 @@ public class BlockingStreamExecutorServiceFactory implements StreamExecutorServi
 
     @Override
     public ExecutorService create(int parallelism, String name, ClassLoader classLoader) {
-        return new BlockingThreadPoolExecutorService(1, false, name, workQueueSize, classLoader);
+        return new BlockingThreadPoolExecutorService(parallelism, false, name, workQueueSize, classLoader);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingRemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..ae508bb
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingRemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.staging;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Load shedding implementation of the remote senders executor factory. It clones the implementation of the
+ * {@link LoadSheddingSenderExecutorServiceFactory} class.
+ * 
+ */
+public class LoadSheddingRemoteSendersExecutorServiceFactory extends LoadSheddingSenderExecutorServiceFactory implements
+        RemoteSendersExecutorServiceFactory {
+
+    @Inject
+    public LoadSheddingRemoteSendersExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+            @Named("s4.sender.workQueueSize") int workQueueSize) {
+        super(threadPoolSize, workQueueSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingSenderExecutorServiceFactory.java
new file mode 100644
index 0000000..56fd8fb
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingSenderExecutorServiceFactory.java
@@ -0,0 +1,62 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.util.S4Metrics;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Factory for sender executors that drops events if the communication channel is full. (Typically because
+ * 
+ */
+public class LoadSheddingSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
+
+    private final int workQueueSize;
+
+    @Inject
+    private S4Metrics metrics;
+
+    private final int parallelism;
+
+    @Inject
+    public LoadSheddingSenderExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+            @Named("s4.sender.workQueueSize") int workQueueSize) {
+        this.workQueueSize = workQueueSize;
+        this.parallelism = threadPoolSize;
+    }
+
+    @Override
+    public ExecutorService create() {
+        boolean remote = (this instanceof RemoteSendersExecutorServiceFactory);
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat(remote ? "remote-sender-%d" : "sender-%d").build();
+
+        RejectedExecutionHandler rejectedExecutionHandler = (remote ? new RejectedExecutionHandler() {
+
+            // from a remote sender
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                metrics.droppedEventInRemoteSender();
+            }
+        } : new RejectedExecutionHandler() {
+
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                metrics.droppedEventInSender();
+            }
+        });
+        ThreadPoolExecutor tpe = new ThreadPoolExecutor(parallelism, parallelism, 60, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(workQueueSize), threadFactory, rejectedExecutionHandler);
+        tpe.allowCoreThreadTimeOut(true);
+        return tpe;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
index 20b7cb9..8b1c4f2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
@@ -3,12 +3,18 @@ package org.apache.s4.core.staging;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
+/**
+ * 
+ * 
+ *
+ */
 public class ThrottlingRemoteSendersExecutorServiceFactory extends ThrottlingSenderExecutorServiceFactory implements
         RemoteSendersExecutorServiceFactory {
 
     @Inject
-    public ThrottlingRemoteSendersExecutorServiceFactory(@Named("s4.sender.maxRate") int maxRate,
-            @Named("s4.sender.parallelism") int threadPoolSize, @Named("s4.sender.workQueueSize") int workQueueSize) {
+    public ThrottlingRemoteSendersExecutorServiceFactory(@Named("s4.remoteSender.maxRate") int maxRate,
+            @Named("s4.remoteSender.parallelism") int threadPoolSize,
+            @Named("s4.remoteSender.workQueueSize") int workQueueSize) {
         super(maxRate, threadPoolSize, workQueueSize);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
index 7e44928..c3eabb3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
@@ -8,6 +8,15 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
+/**
+ * 
+ * Factory for sender executors that limits submission rate.
+ * <p>
+ * If the specified rate is higher than what can be sent through the communication channel, events are dropped in a LIFO
+ * order (and logged through a corresponding meter).
+ * 
+ * 
+ */
 public class ThrottlingSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
 
     private final int maxRate;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index de8897c..0b81bbe 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -50,6 +50,10 @@ public class S4Metrics {
     private final Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
     private final Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
     private final Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
+    private final Meter droppedInSenderMeter = Metrics.newMeter(SenderImpl.class, "dropped@sender", "dropped@sender",
+            TimeUnit.SECONDS);
+    private final Meter droppedInRemoteSenderMeter = Metrics.newMeter(SenderImpl.class, "dropped@remote-sender",
+            "dropped@remote-sender", TimeUnit.SECONDS);
 
     private final Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
 
@@ -61,13 +65,11 @@ public class S4Metrics {
             senderMeters[i] = Metrics.newMeter(SenderImpl.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
         }
         Metrics.newGauge(Stream.class, "local-vs-remote", new Gauge<Double>() {
-
             @Override
             public Double value() {
                 // this will return NaN if divider is zero
                 return localEventsMeter.oneMinuteRate() / remoteEventsMeter.oneMinuteRate();
             }
-
         });
 
     }
@@ -118,6 +120,14 @@ public class S4Metrics {
         }
     }
 
+    public void droppedEventInSender() {
+        droppedInSenderMeter.mark();
+    }
+
+    public void droppedEventInRemoteSender() {
+        droppedInRemoteSenderMeter.mark();
+    }
+
     public void sentLocal() {
         localEventsMeter.mark();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 9520831..2b3f8c7 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -113,7 +113,7 @@ public class FTWordCountTest extends ZkBasedTest {
 
     }
 
-    private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) {
+    private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) throws InterruptedException {
         Event event;
         event = new Event();
         event.setStreamId("inputStream");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index a9734cf..da2bf5d 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -79,7 +79,7 @@ public class RecoveryTest extends ZkBasedTest {
                 CheckpointingModuleWithUnrespondingFetchingStorageBackend.class, false, "value1= ; value2=message2");
     }
 
-    private void insertCheckpointInstruction(Injector injector, TCPEmitter emitter) {
+    private void insertCheckpointInstruction(Injector injector, TCPEmitter emitter) throws InterruptedException {
         Event event;
         event = new Event();
         event.setStreamId("inputStream");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dfe9ba/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index f0a18f2..8ee58e5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -111,7 +111,7 @@ public class WordCountTest extends ZkBasedTest {
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
     }
 
-    public void injectSentence(String sentence) throws IOException {
+    public void injectSentence(String sentence) throws IOException, InterruptedException {
         Event event = new Event();
         event.setStreamId("inputStream");
         event.put("sentence", String.class, sentence);


Mime
View raw message