incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [12/15] git commit: Throttling and shedding improvements - use a load shedding stream executor by default (drops incoming events when local stream queue is full) - use a blocking sender executor by default - provide throttling executors with limit rate
Date Fri, 18 Jan 2013 12:16:46 GMT
Throttling and shedding improvements
- use a load shedding stream executor by default (drops incoming events when local stream queue is full)
- use a blocking sender executor by default
- provide throttling executors with limit rate


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/6fe7ea8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/6fe7ea8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/6fe7ea8a

Branch: refs/heads/dev
Commit: 6fe7ea8a11e2fe59948b8d3b4f8ed95cf254e901
Parents: ab3ac77
Author: Matthieu Morel <mmorel@apache.org>
Authored: Thu Dec 6 15:37:32 2012 +0100
Committer: Matthieu Morel <mmorel@apache.org>
Committed: Sat Dec 8 14:15:14 2012 +0100

----------------------------------------------------------------------
 build.gradle                                       |    2 +-
 subprojects/s4-benchmarks/README.md                |    7 +-
 subprojects/s4-benchmarks/config/injector.config   |    4 +
 .../s4/benchmark/utils/InjectionLimiterModule.java |   15 +
 .../staging/BlockingThreadPoolExecutorService.java |  212 +++++++++++++++
 ...haredThrottlingDeserializerExecutorFactory.java |   36 ---
 .../ThrottlingThreadPoolExecutorService.java       |  112 ++------
 .../src/main/resources/default.s4.comm.properties  |    4 +-
 .../main/java/org/apache/s4/core/AppModule.java    |    3 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |   12 +-
 ...lockingRemoteSendersExecutorServiceFactory.java |   20 ++
 .../BlockingSenderExecutorServiceFactory.java      |   33 +++
 .../BlockingStreamExecutorServiceFactory.java      |   24 ++
 ...DefaultRemoteSendersExecutorServiceFactory.java |   20 --
 .../DefaultSenderExecutorServiceFactory.java       |   34 ---
 ...aultStreamProcessingExecutorServiceFactory.java |   37 ---
 .../LoadSheddingStreamExecutorServiceFactory.java  |   63 +++++
 .../core/staging/StreamExecutorServiceFactory.java |    4 +-
 ...ottlingRemoteSendersExecutorServiceFactory.java |   15 +
 .../ThrottlingSenderExecutorServiceFactory.java    |   34 +++
 .../java/org/apache/s4/core/util/S4Metrics.java    |    4 +
 .../org/apache/s4/fixtures/MockCoreModule.java     |   10 +-
 22 files changed, 470 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e1b981d..aa0bd0a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -53,7 +53,7 @@ allprojects {
 
 /* All project libraries must be defined here. */
 project.ext["libraries"] = [
-    guava:              'com.google.guava:guava:12.0.1',
+    guava:              'com.google.guava:guava:13.0.1',
     gson:               'com.google.code.gson:gson:1.6',
     guice:              'com.google.inject:guice:3.0',
     aop_alliance:       'aopalliance:aopalliance:1.0',

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
index 93062f6..9847b84 100644
--- a/subprojects/s4-benchmarks/README.md
+++ b/subprojects/s4-benchmarks/README.md
@@ -53,6 +53,10 @@ Exmample configuration files are available in `/config` and you can configure :
 - the number of events between making a pause in the injection
 - the duration of the pause (can be 0)
 - etc…
+- It is also possible to limit the injection rate by using and configuring the InjectionLimiterModule class. Parameters are `s4.sender.maxRate` and `s4.sender.warmupPeriod`.
+
+
+The total number of events sent from an injector is `number of keys * number of test iterations * number of parallel injection threads`. Make sure this is significant in order to be able to correctly interpret the messaging rates (1000 would be too little for instance!).
 
 The total number of events sent from an injector is `number of keys * number of test iterations * number of parallel injection threads`. Make sure this is significant in order to be able to correctly interpret the messaging rates (1000 would be too little for instance!).
 
@@ -70,6 +74,7 @@ For a distributed setup, you should modify the host names in the above command l
 Here is an example for driving the test on a cluster:
 `incubator-s4/subprojects/s4-benchmarks/bench-cluster.sh "processingHost1 processingHost2 processingHost3" "injectorConfigStream1.cfg injectorConfigStream2.cfg" node.cfg 2 "injectorHost1 injectorHost2 injectorHost3 injectorHost4"` (the `2` controls the number of injectors per stream per injector host)
 
+
 ## Results
 
 
@@ -83,7 +88,7 @@ You may also check that all events have been processed:
 
 * each injector reports how many events it sent on which stream
 * each node reports the total number of events received
-* you should get `total injected from all injectors = total received in all nodes` (minus events sent through internal streams in the app, if that applies)
+* depending on the injection rate, you can see how many events have been dropped, if any: `total injected from all injectors >= total received in all nodes` (minus events sent through internal streams in the app, if that applies)
 
 
 ## Notes

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-benchmarks/config/injector.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/injector.config b/subprojects/s4-benchmarks/config/injector.config
index e02587c..08c89ef 100644
--- a/subprojects/s4-benchmarks/config/injector.config
+++ b/subprojects/s4-benchmarks/config/injector.config
@@ -1,3 +1,7 @@
 -c=testCluster1
 -appClass=org.apache.s4.benchmark.utils.Injector
 -p=s4.sender.parallelism=4,s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=10000,s4.benchmark.testIterations=1000000,s4.benchmark.injector.iterationsBeforePause=1000,s4.benchmark.pauseTimeMs=20,s4.benchmark.injector.parallelism=2
+
+# the following controls the injection rate, with a maximum of 20000 events / s / sender 
+#-p=s4.sender.maxRate=20000,s4.sender.parallelism=4,s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=10000,s4.benchmark.testIterations=1000000,s4.benchmark.injector.iterationsBeforePause=1000,s4.benchmark.pauseTimeMs=0,s4.benchmark.injector.parallelism=2
+#-emc=org.apache.s4.benchmark.utils.InjectionLimiterModule

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/InjectionLimiterModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/InjectionLimiterModule.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/InjectionLimiterModule.java
new file mode 100644
index 0000000..886fd32
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/InjectionLimiterModule.java
@@ -0,0 +1,15 @@
+package org.apache.s4.benchmark.utils;
+
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.ThrottlingRemoteSendersExecutorServiceFactory;
+
+import com.google.inject.AbstractModule;
+
+public class InjectionLimiterModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(RemoteSendersExecutorServiceFactory.class).to(ThrottlingRemoteSendersExecutorServiceFactory.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
new file mode 100644
index 0000000..f3f6c9d
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
@@ -0,0 +1,212 @@
+package org.apache.s4.comm.staging;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This thread pool executor throttles the submission of new tasks by using a semaphore. Task submissions require
+ * permits, task completions release permits.
+ * <p>
+ * NOTE: you should either use the {@link BlockingThreadPoolExecutorService#submit(java.util.concurrent.Callable)}
+ * methods or the {@link BlockingThreadPoolExecutorService#execute(Runnable)} method.
+ * 
+ */
+public class BlockingThreadPoolExecutorService extends ForwardingListeningExecutorService {
+
+    private static Logger logger = LoggerFactory.getLogger(BlockingThreadPoolExecutorService.class);
+
+    int parallelism;
+    String streamName;
+    final ClassLoader classLoader;
+    int workQueueSize;
+    private BlockingQueue<Runnable> workQueue;
+    private Semaphore queueingPermits;
+    private ListeningExecutorService executorDelegatee;
+
+    /**
+     * 
+     * @param parallelism
+     *            Maximum number of threads in the pool
+     * @param fairParallelism
+     *            If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
+     *            can be help ensure ordering, though there is an associated performance cost (typically small).
+     * @param threadName
+     *            Naming scheme
+     * @param workQueueSize
+     *            Queue capacity
+     * @param classLoader
+     *            ClassLoader used as contextClassLoader for spawned threads
+     */
+    public BlockingThreadPoolExecutorService(int parallelism, boolean fairParallelism, String threadName,
+            int workQueueSize, final ClassLoader classLoader) {
+        super();
+        this.parallelism = parallelism;
+        this.streamName = threadName;
+        this.classLoader = classLoader;
+        this.workQueueSize = workQueueSize;
+        queueingPermits = new Semaphore(workQueueSize + parallelism, false);
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName)
+                .setThreadFactory(new ThreadFactory() {
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        Thread t = new Thread(r);
+                        t.setContextClassLoader(classLoader);
+                        return t;
+                    }
+                }).build();
+        // queueingPermits semaphore controls the size of the queue, thus no need to use a bounded queue
+        workQueue = new LinkedBlockingQueue<Runnable>(workQueueSize + parallelism);
+        ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(parallelism, parallelism, 60,
+                TimeUnit.SECONDS, workQueue, threadFactory, new RejectedExecutionHandler() {
+
+                    @Override
+                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                        // This is not expected to happen.
+                        logger.error("Could not submit task to executor {}", executor.toString());
+                    }
+                });
+        eventProcessingExecutor.allowCoreThreadTimeOut(true);
+        executorDelegatee = MoreExecutors.listeningDecorator(eventProcessingExecutor);
+
+    }
+
+    @Override
+    protected ListeningExecutorService delegate() {
+        return executorDelegatee;
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
+        return future;
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
+        return future;
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
+        return future;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        super.execute(new RunnableWithPermitRelease(command));
+    }
+
+    /**
+     * Releases a permit after the task is executed
+     * 
+     */
+    class RunnableWithPermitRelease implements Runnable {
+
+        Runnable delegatee;
+
+        public RunnableWithPermitRelease(Runnable delegatee) {
+            this.delegatee = delegatee;
+        }
+
+        @Override
+        public void run() {
+            try {
+                delegatee.run();
+            } finally {
+                queueingPermits.release();
+            }
+
+        }
+    }
+
+    /**
+     * Releases a permit after the task is completed
+     * 
+     */
+    class CallableWithPermitRelease<T> implements Callable<T> {
+
+        Callable<T> delegatee;
+
+        public CallableWithPermitRelease(Callable<T> delegatee) {
+            this.delegatee = delegatee;
+        }
+
+        @Override
+        public T call() throws Exception {
+            try {
+                return delegatee.call();
+            } finally {
+                queueingPermits.release();
+            }
+        }
+
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        throw new RuntimeException("Not implemented");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java
deleted file mode 100644
index 48ff4f9..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.s4.comm.staging;
-
-import java.util.concurrent.Executor;
-
-import org.apache.s4.comm.DeserializerExecutorFactory;
-import org.slf4j.LoggerFactory;
-
-/**
- * A factory for deserializer executors that returns a unique thread pool, shared among channels. This can be useful
- * when there are many inbound channels and that you need to limit the number of threads in the node.
- * 
- */
-public class SharedThrottlingDeserializerExecutorFactory implements DeserializerExecutorFactory {
-
-    private static final int QUEUE_CAPACITY = 100000;
-
-    enum ThrottlingExecutorSingleton {
-        INSTANCE;
-
-        ThrottlingThreadPoolExecutorService executor;
-
-        private ThrottlingExecutorSingleton() {
-            this.executor = new ThrottlingThreadPoolExecutorService(Runtime.getRuntime().availableProcessors(), true,
-                    "listener-deserializer-%d", QUEUE_CAPACITY, Thread.currentThread().getContextClassLoader());
-        }
-    }
-
-    @Override
-    public Executor create() {
-        LoggerFactory
-                .getLogger(getClass())
-                .info("Creating a shared (i.e. singleton, shared across netty channel workers) throttling thread pool with queue capacity of {}",
-                        QUEUE_CAPACITY);
-        return ThrottlingExecutorSingleton.INSTANCE.executor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
index a805f21..2b01969 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
@@ -2,13 +2,12 @@ package org.apache.s4.comm.staging;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -21,16 +20,11 @@ import com.google.common.util.concurrent.ForwardingListeningExecutorService;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
 
-/**
- * This thread pool executor throttles the submission of new tasks by using a semaphore. Task submissions require
- * permits, task completions release permits.
- * <p>
- * NOTE: you should either use the {@link ThrottlingThreadPoolExecutorService#submit(java.util.concurrent.Callable)}
- * methods or the {@link ThrottlingThreadPoolExecutorService#execute(Runnable)} method.
- * 
- */
 public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExecutorService {
 
     private static Logger logger = LoggerFactory.getLogger(ThrottlingThreadPoolExecutorService.class);
@@ -40,8 +34,9 @@ public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExec
     final ClassLoader classLoader;
     int workQueueSize;
     private BlockingQueue<Runnable> workQueue;
-    private Semaphore queueingPermits;
+    private RateLimiter rateLimitedPermits;
     private ListeningExecutorService executorDelegatee;
+    Meter droppingMeter;
 
     /**
      * 
@@ -57,14 +52,15 @@ public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExec
      * @param classLoader
      *            ClassLoader used as contextClassLoader for spawned threads
      */
-    public ThrottlingThreadPoolExecutorService(int parallelism, boolean fairParallelism, String threadName,
-            int workQueueSize, final ClassLoader classLoader) {
+    public ThrottlingThreadPoolExecutorService(int parallelism, int rate, String threadName, int workQueueSize,
+            final ClassLoader classLoader) {
         super();
         this.parallelism = parallelism;
         this.streamName = threadName;
         this.classLoader = classLoader;
         this.workQueueSize = workQueueSize;
-        queueingPermits = new Semaphore(workQueueSize + parallelism, false);
+        this.droppingMeter = Metrics.newMeter(getClass(), "throttling-dropping", "throttling-dropping",
+                TimeUnit.SECONDS);
         ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName)
                 .setThreadFactory(new ThreadFactory() {
 
@@ -75,15 +71,14 @@ public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExec
                         return t;
                     }
                 }).build();
-        // queueingPermits semaphore controls the size of the queue, thus no need to use a bounded queue
-        workQueue = new LinkedBlockingQueue<Runnable>(workQueueSize + parallelism);
+        rateLimitedPermits = RateLimiter.create(rate);
+        workQueue = new ArrayBlockingQueue<Runnable>(workQueueSize + parallelism);
         ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(parallelism, parallelism, 60,
                 TimeUnit.SECONDS, workQueue, threadFactory, new RejectedExecutionHandler() {
 
                     @Override
                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-                        // This is not expected to happen.
-                        logger.error("Could not submit task to executor {}", executor.toString());
+                        droppingMeter.mark();
                     }
                 });
         eventProcessingExecutor.allowCoreThreadTimeOut(true);
@@ -98,93 +93,29 @@ public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExec
 
     @Override
     public <T> ListenableFuture<T> submit(Callable<T> task) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            Thread.currentThread().interrupt();
-        }
-        ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
+        rateLimitedPermits.acquire();
+        ListenableFuture<T> future = super.submit(task);
         return future;
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Runnable task, T result) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
+        rateLimitedPermits.acquire();
+        ListenableFuture<T> future = super.submit(task, result);
         return future;
     }
 
     @Override
     public ListenableFuture<?> submit(Runnable task) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
+        rateLimitedPermits.acquire();
+        ListenableFuture<?> future = super.submit(task);
         return future;
     }
 
     @Override
     public void execute(Runnable command) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            Thread.currentThread().interrupt();
-        }
-        super.execute(new RunnableWithPermitRelease(command));
-    }
-
-    /**
-     * Releases a permit after the task is executed
-     * 
-     */
-    class RunnableWithPermitRelease implements Runnable {
-
-        Runnable delegatee;
-
-        public RunnableWithPermitRelease(Runnable delegatee) {
-            this.delegatee = delegatee;
-        }
-
-        @Override
-        public void run() {
-            try {
-                delegatee.run();
-            } finally {
-                queueingPermits.release();
-            }
-
-        }
-    }
-
-    /**
-     * Releases a permit after the task is completed
-     * 
-     */
-    class CallableWithPermitRelease<T> implements Callable<T> {
-
-        Callable<T> delegatee;
-
-        public CallableWithPermitRelease(Callable<T> delegatee) {
-            this.delegatee = delegatee;
-        }
-
-        @Override
-        public T call() throws Exception {
-            try {
-                return delegatee.call();
-            } finally {
-                queueingPermits.release();
-            }
-        }
-
+        rateLimitedPermits.acquire();
+        super.execute(command);
     }
 
     @Override
@@ -208,5 +139,4 @@ public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExec
             throws InterruptedException, ExecutionException, TimeoutException {
         throw new RuntimeException("Not implemented");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 11db4eb..7d1270b 100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -11,9 +11,9 @@ s4.cluster.zk_connection_timeout = 10000
 # how many threads to use for the sender stage (i.e. serialization)
 s4.sender.parallelism=1
 # maximum number of events in the buffer of the sender stage
-s4.sender.workQueueSize=10000
+s4.sender.workQueueSize=100000
 # maximum number of pending writes to a given comm channel
 s4.emitter.maxPendingWrites=1000
 
 # maximum number of events in the buffer of the processing stage
-s4.stream.workQueueSize=10000
\ No newline at end of file
+s4.stream.workQueueSize=100000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
index 909fea6..20556f8 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -6,7 +6,6 @@ import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.core.util.S4Metrics;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
@@ -26,7 +25,7 @@ public class AppModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(S4Metrics.class);
+        // bind(S4Metrics.class);
 
         bind(Receiver.class).to(ReceiverImpl.class);
         bind(Sender.class).to(SenderImpl.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 8b7a632..cf17f36 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -29,9 +29,9 @@ import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
-import org.apache.s4.core.staging.DefaultRemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
-import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
+import org.apache.s4.core.staging.LoadSheddingStreamExecutorServiceFactory;
 import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
@@ -85,10 +85,10 @@ public class DefaultCoreModule extends AbstractModule {
         // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
 
-        bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
-        bind(RemoteSendersExecutorServiceFactory.class).to(DefaultRemoteSendersExecutorServiceFactory.class);
+        bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
+        bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
 
-        bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
+        bind(StreamExecutorServiceFactory.class).to(LoadSheddingStreamExecutorServiceFactory.class);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..5180062
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.staging;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Blocking implementation of the remote senders executor factory. It clones the implementation of the
+ * {@link BlockingSenderExecutorServiceFactory} class.
+ * 
+ */
+public class BlockingRemoteSendersExecutorServiceFactory extends BlockingSenderExecutorServiceFactory implements
+        RemoteSendersExecutorServiceFactory {
+
+    @Inject
+    public BlockingRemoteSendersExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+            @Named("s4.sender.workQueueSize") int workQueueSize) {
+        super(threadPoolSize, workQueueSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
new file mode 100644
index 0000000..aad3a19
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
@@ -0,0 +1,33 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.staging.BlockingThreadPoolExecutorService;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Blocking factory implementation for the sender executor service. It uses a mechanism that blocks the submission of
+ * events when the work queue is full.
+ * 
+ */
+public class BlockingSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
+
+    private final int threadPoolSize;
+    private final int workQueueSize;
+
+    @Inject
+    public BlockingSenderExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+            @Named("s4.sender.workQueueSize") int workQueueSize) {
+        this.threadPoolSize = threadPoolSize;
+        this.workQueueSize = workQueueSize;
+    }
+
+    @Override
+    public ExecutorService create() {
+        return new BlockingThreadPoolExecutorService(threadPoolSize, false,
+                (this instanceof RemoteSendersExecutorServiceFactory) ? "remote-sender-%d" : "sender-%d",
+                workQueueSize, getClass().getClassLoader());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
new file mode 100644
index 0000000..940a178
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
@@ -0,0 +1,24 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.staging.BlockingThreadPoolExecutorService;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class BlockingStreamExecutorServiceFactory implements StreamExecutorServiceFactory {
+
+    private final int workQueueSize;
+
+    @Inject
+    public BlockingStreamExecutorServiceFactory(@Named("s4.stream.workQueueSize") int workQueueSize) {
+        this.workQueueSize = workQueueSize;
+    }
+
+    @Override
+    public ExecutorService create(int parallelism, String name, ClassLoader classLoader) {
+        return new BlockingThreadPoolExecutorService(1, false, name, workQueueSize, classLoader);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
deleted file mode 100644
index b47087f..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.s4.core.staging;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * Default implementation of the remote senders executor factory. It clones the implementation of the
- * {@link DefaultSenderExecutorServiceFactory} class.
- * 
- */
-public class DefaultRemoteSendersExecutorServiceFactory extends DefaultSenderExecutorServiceFactory implements
-        RemoteSendersExecutorServiceFactory {
-
-    @Inject
-    public DefaultRemoteSendersExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
-            @Named("s4.sender.workQueueSize") int workQueueSize) {
-        super(threadPoolSize, workQueueSize);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
deleted file mode 100644
index c2ae49d..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.s4.core.staging;
-
-import java.util.concurrent.ExecutorService;
-
-import org.apache.s4.comm.staging.ThrottlingThreadPoolExecutorService;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * Default factory implementation for the sender executor service. It uses a mechanism for throttling the submission of
- * events and maintaining partial order.
- * 
- */
-public class DefaultSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
-
-    private final int threadPoolSize;
-    private final int workQueueSize;
-
-    @Inject
-    public DefaultSenderExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
-            @Named("s4.sender.workQueueSize") int workQueueSize) {
-        this.threadPoolSize = threadPoolSize;
-        this.workQueueSize = workQueueSize;
-    }
-
-    @Override
-    public ExecutorService create() {
-        return new ThrottlingThreadPoolExecutorService(threadPoolSize, true,
-                (this instanceof DefaultRemoteSendersExecutorServiceFactory) ? "remote-sender-%d" : "sender-%d",
-                workQueueSize, getClass().getClassLoader());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
deleted file mode 100644
index 864b942..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.s4.core.staging;
-
-import java.util.concurrent.ExecutorService;
-
-import org.apache.s4.comm.staging.ThrottlingThreadPoolExecutorService;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * <p>
- * Default factory for the event processing stage executors.
- * </p>
- * <p>
- * It provides optional parallelism, when the processing activity requires blocking I/O operations, or is CPU-bound.
- * </p>
- * <p>
- * It throttles the submission of events while preserving partial ordering.
- * </p>
- * 
- */
-public class DefaultStreamProcessingExecutorServiceFactory implements StreamExecutorServiceFactory {
-
-    private int workQueueSize;
-
-    @Inject
-    public DefaultStreamProcessingExecutorServiceFactory(@Named("s4.stream.workQueueSize") int workQueueSize) {
-        this.workQueueSize = workQueueSize;
-    }
-
-    @Override
-    public ExecutorService create(int parallelism, String name, final ClassLoader classLoader) {
-        return new ThrottlingThreadPoolExecutorService(parallelism, true, "stream-" + name + "-%d", workQueueSize,
-                classLoader);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingStreamExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingStreamExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingStreamExecutorServiceFactory.java
new file mode 100644
index 0000000..b9b8597
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingStreamExecutorServiceFactory.java
@@ -0,0 +1,63 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.util.S4Metrics;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Load shedding factory for the event processing stage executors.
+ * <p>
+ * It provides optional parallelism, when the processing activity requires blocking I/O operations, or is CPU-bound.
+ * <p>
+ * It drops events when work queue is full.
+ * <p>
+ * More customized load shedding strategies can be defined based on this simple implementation.
+ * 
+ */
+public class LoadSheddingStreamExecutorServiceFactory implements StreamExecutorServiceFactory {
+
+    private final int workQueueSize;
+
+    @Inject
+    private S4Metrics metrics;
+
+    @Inject
+    public LoadSheddingStreamExecutorServiceFactory(@Named("s4.stream.workQueueSize") int workQueueSize) {
+        this.workQueueSize = workQueueSize;
+    }
+
+    @Override
+    public ExecutorService create(int parallelism, final String name, final ClassLoader classLoader) {
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("stream-" + name + "-%d").setThreadFactory(new ThreadFactory() {
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        Thread t = new Thread(r);
+                        t.setContextClassLoader(classLoader);
+                        return t;
+                    }
+                }).build();
+        RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
+
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                metrics.droppedEvent(name);
+            }
+        };
+        ThreadPoolExecutor tpe = new ThreadPoolExecutor(parallelism, parallelism, 60, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(workQueueSize), threadFactory, rejectedExecutionHandler);
+        tpe.allowCoreThreadTimeOut(true);
+        return tpe;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
index db2df27..6a7a572 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
@@ -9,7 +9,9 @@ import org.apache.s4.core.App;
  * a configurable thread pool.
  * <p>
  * Implementations may use dependency injection to set some default parameters.
- * </p>
+ * <p>
+ * Implementations may rely on different strategies for handling high loads: blocking, throttling, dropping and that may
+ * also be provided on a per-stream basis (based on the name of the stream for instance).
  */
 public interface StreamExecutorServiceFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..20b7cb9
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,15 @@
+package org.apache.s4.core.staging;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ThrottlingRemoteSendersExecutorServiceFactory extends ThrottlingSenderExecutorServiceFactory implements
+        RemoteSendersExecutorServiceFactory {
+
+    @Inject
+    public ThrottlingRemoteSendersExecutorServiceFactory(@Named("s4.sender.maxRate") int maxRate,
+            @Named("s4.sender.parallelism") int threadPoolSize, @Named("s4.sender.workQueueSize") int workQueueSize) {
+        super(maxRate, threadPoolSize, workQueueSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
new file mode 100644
index 0000000..7e44928
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
@@ -0,0 +1,34 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.staging.ThrottlingThreadPoolExecutorService;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ThrottlingSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
+
+    private final int maxRate;
+    private final int threadPoolSize;
+    private final int workQueueSize;
+
+    @Inject
+    public ThrottlingSenderExecutorServiceFactory(@Named("s4.sender.maxRate") int maxRate,
+            @Named("s4.sender.parallelism") int threadPoolSize, @Named("s4.sender.workQueueSize") int workQueueSize) {
+        this.maxRate = maxRate;
+        this.threadPoolSize = threadPoolSize;
+        this.workQueueSize = workQueueSize;
+    }
+
+    @Override
+    public ExecutorService create() {
+        LoggerFactory.getLogger(getClass()).info(
+                "Creating a throttling executor with a pool size of {} and max rate of {} events / s",
+                new String[] { String.valueOf(threadPoolSize), String.valueOf(maxRate), });
+        return new ThrottlingThreadPoolExecutorService(threadPoolSize, maxRate,
+                (this instanceof RemoteSendersExecutorServiceFactory) ? "remote-sender-%d" : "sender-%d",
+                workQueueSize, getClass().getClassLoader());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 1b3a900..de8897c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -135,6 +135,10 @@ public class S4Metrics {
         dequeuingStreamMeters.get(name).mark();
     }
 
+    public void droppedEvent(String streamName) {
+        droppedStreamMeters.get(streamName).mark();
+    }
+
     public void createRemoteStreamMeters(String remoteClusterName, int partitionCount) {
         Meter[] meters = new Meter[partitionCount];
         for (int i = 0; i < partitionCount; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6fe7ea8a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index e18acbf..ae19011 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,8 +20,8 @@ package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.DeserializerExecutorFactory;
 import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
-import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
-import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
@@ -50,9 +50,11 @@ public class MockCoreModule extends AbstractModule {
 
         // Although we want to mock as much as possible, most tests still require the machinery for routing events
         // within a stream/node, therefore sender and stream executors are not mocked
-        bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
 
-        bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+        // NOTE: we use a blocking executor so that events don't get dropped in simple tests
+        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
+
+        bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
         bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
 
         bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);


Mime
View raw message