james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From matth...@apache.org
Subject [james-project] 04/09: JAMES-2786 RabbitMQ EventBus stop consume messages under heavy load
Date Fri, 14 Jun 2019 16:32:55 GMT
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cf0a390afd8b62daf7719c2b9233d03f34c0f9aa
Author: Tran Tien Duc <dtran@linagora.com>
AuthorDate: Wed Jun 5 15:16:56 2019 +0700

    JAMES-2786 RabbitMQ EventBus stop consume messages under heavy load
---
 .../james/mailbox/events/EventDispatcher.java      |  7 ++++-
 .../james/mailbox/events/RabbitMQEventBus.java     |  3 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 35 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 09c777e..5cb24aa 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXC
 
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 
 import org.apache.james.event.json.EventSerializer;
@@ -57,6 +58,7 @@ class EventDispatcher {
     private final LocalListenerRegistry localListenerRegistry;
     private final AMQP.BasicProperties basicProperties;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    final AtomicInteger dispatchCount = new AtomicInteger();
 
     EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor)
{
         this.eventSerializer = eventSerializer;
@@ -81,6 +83,7 @@ class EventDispatcher {
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(serializeEvent(event), keys))
             .then()
+            .doOnSuccess(any -> dispatchCount.incrementAndGet())
             .subscribeWith(MonoProcessor.create());
     }
 
@@ -124,7 +127,9 @@ class EventDispatcher {
         Stream<OutboundMessage> outboundMessages = routingKeys
             .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(),
basicProperties, serializedEvent));
 
-        return sender.send(Flux.fromStream(outboundMessages));
+        return sender.send(Flux.fromStream(outboundMessages))
+            .publishOn(Schedulers.elastic())
+            .doOnError(th -> th.printStackTrace());
     }
 
     private byte[] serializeEvent(Event event) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 041ded3..8ec96d6 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
+
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
@@ -55,7 +56,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private volatile boolean isStopping;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
-    private EventDispatcher eventDispatcher;
+    EventDispatcher eventDispatcher;
     private Sender sender;
 
     @Inject
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 66ae6d3..f94b960 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.james.mailbox.events;
 
+import static com.jayway.awaitility.Awaitility.await;
 import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.newCountingListener;
+import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.totalEventsReceived;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
@@ -47,6 +50,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
@@ -70,6 +74,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.ImmutableList;
 import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
@@ -181,6 +186,36 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
         EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
 
+        @Disabled("consuming too many threads")
+        @Test
+        void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws Exception {
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1
= newCountingListener();
+
+            eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
+            int totalGlobalRegistrations = 1; // GroupA + GroupB + GroupC
+
+            int threadCount = 10;
+            int operationCount = 10000;
+            int totalDispatchOperations = threadCount * operationCount;
+            eventBus = (RabbitMQEventBus) eventBus();
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT,
NO_KEYS))
+                .threadCount(threadCount)
+                .operationCount(operationCount)
+                .runSuccessfullyWithin(Duration.ofMinutes(10));
+
+            // there is a moment when RabbitMQ EventBus consumed amount of messages, then
it will stop to consume more
+            await()
+                .pollInterval(com.jayway.awaitility.Duration.FIVE_SECONDS)
+                .timeout(com.jayway.awaitility.Duration.TEN_MINUTES).until(() -> {
+                    int totalEventsReceived = totalEventsReceived(ImmutableList.of(countingListener1));
+                    System.out.println("event received: " + totalEventsReceived);
+                    System.out.println("dispatching count: " + eventBus.eventDispatcher.dispatchCount.get());
+                    assertThat(totalEventsReceived)
+                        .isEqualTo(totalGlobalRegistrations * totalDispatchOperations);
+                });
+        }
+
         @Override
         public EventBus eventBus3() {
             return eventBus3;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message