james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From matth...@apache.org
Subject [james-project] 05/09: JAMES-2786 limit the number of thread required for the concurrent stress test
Date Fri, 14 Jun 2019 16:32:56 GMT
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit edce29a4a9154ba0f92e2f0d4bc14bf5f7b6ff2c
Author: Matthieu Baechler <matthieu@apache.org>
AuthorDate: Wed Jun 12 18:16:06 2019 +0200

    JAMES-2786 limit the number of thread required for the concurrent stress test
---
 .../main/java/org/apache/james/mailbox/events/EventDispatcher.java  | 6 ++----
 .../java/org/apache/james/mailbox/events/GroupRegistration.java     | 2 +-
 .../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java  | 3 +--
 3 files changed, 4 insertions(+), 7 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 5cb24aa..8f31102 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -82,6 +82,7 @@ class EventDispatcher {
             .concat(
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(serializeEvent(event), keys))
+            .subscribeOn(Schedulers.elastic())
             .then()
             .doOnSuccess(any -> dispatchCount.incrementAndGet())
             .subscribeWith(MonoProcessor.create());
@@ -89,7 +90,6 @@ class EventDispatcher {
 
     private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey>
keys) {
         return Flux.fromIterable(keys)
-            .subscribeOn(Schedulers.elastic())
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
             .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
@@ -127,9 +127,7 @@ class EventDispatcher {
         Stream<OutboundMessage> outboundMessages = routingKeys
             .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(),
basicProperties, serializedEvent));
 
-        return sender.send(Flux.fromStream(outboundMessages))
-            .publishOn(Schedulers.elastic())
-            .doOnError(th -> th.printStackTrace());
+        return sender.send(Flux.fromStream(outboundMessages));
     }
 
     private byte[] serializeEvent(Event event) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 985da0a..5164c2d 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -140,7 +140,7 @@ class GroupRegistration implements Registration {
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
         return delayGenerator.delayIfHaveTo(currentRetryCount)
-            .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))).publishOn(Schedulers.elastic()))
+            .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount,
throwable))
             .then(Mono.fromRunnable(acknowledgableDelivery::ack))
             .subscribeWith(MonoProcessor.create())
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index f94b960..eb285d0 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -186,7 +186,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
         EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
 
-        @Disabled("consuming too many threads")
         @Test
         void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws Exception {
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1
= newCountingListener();
@@ -199,7 +198,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             int totalDispatchOperations = threadCount * operationCount;
             eventBus = (RabbitMQEventBus) eventBus();
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT,
NO_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT,
NO_KEYS).block())
                 .threadCount(threadCount)
                 .operationCount(operationCount)
                 .runSuccessfullyWithin(Duration.ofMinutes(10));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message