james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [james-project] 08/12: JAMES-2659 Rename ambiguous Registry
Date Wed, 13 Feb 2019 08:18:15 GMT
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5d64652ac1263d8a03a3efe96475835bb82ec56f
Author: Benoit Tellier <btellier@linagora.com>
AuthorDate: Tue Feb 12 11:06:56 2019 +0700

    JAMES-2659 Rename ambiguous Registry
    
    Make it explicit that we keep track of locally registered listeners
---
 .../james/mailbox/events/EventDispatcher.java      |  9 ++++----
 .../mailbox/events/KeyRegistrationHandler.java     | 10 ++++-----
 ...nerRegistry.java => LocalListenerRegistry.java} | 13 ++++++------
 .../james/mailbox/events/RabbitMQEventBus.java     |  7 ++++---
 ...tryTest.java => LocalListenerRegistryTest.java} | 24 +++++++++++-----------
 5 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 2f90983..09c777e 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.rabbitmq.client.AMQP;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoProcessor;
@@ -53,14 +54,14 @@ class EventDispatcher {
 
     private final EventSerializer eventSerializer;
     private final Sender sender;
-    private final MailboxListenerRegistry mailboxListenerRegistry;
+    private final LocalListenerRegistry localListenerRegistry;
     private final AMQP.BasicProperties basicProperties;
     private final MailboxListenerExecutor mailboxListenerExecutor;
 
-    EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
MailboxListenerRegistry mailboxListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor)
{
+    EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor)
{
         this.eventSerializer = eventSerializer;
         this.sender = sender;
-        this.mailboxListenerRegistry = mailboxListenerRegistry;
+        this.localListenerRegistry = localListenerRegistry;
         this.basicProperties = new AMQP.BasicProperties.Builder()
             .headers(ImmutableMap.of(EVENT_BUS_ID, eventBusId.asString()))
             .build();
@@ -86,7 +87,7 @@ class EventDispatcher {
     private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey>
keys) {
         return Flux.fromIterable(keys)
             .subscribeOn(Schedulers.elastic())
-            .flatMap(key -> mailboxListenerRegistry.getLocalMailboxListeners(key)
+            .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
             .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
             .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index dac9d93..59ed7c1 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -53,7 +53,7 @@ class KeyRegistrationHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
 
     private final EventBusId eventBusId;
-    private final MailboxListenerRegistry mailboxListenerRegistry;
+    private final LocalListenerRegistry localListenerRegistry;
     private final EventSerializer eventSerializer;
     private final Sender sender;
     private final RoutingKeyConverter routingKeyConverter;
@@ -63,12 +63,12 @@ class KeyRegistrationHandler {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
-    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender
sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, MailboxListenerRegistry
mailboxListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender
sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry
localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventBusId = eventBusId;
         this.eventSerializer = eventSerializer;
         this.sender = sender;
         this.routingKeyConverter = routingKeyConverter;
-        this.mailboxListenerRegistry = mailboxListenerRegistry;
+        this.localListenerRegistry = localListenerRegistry;
         this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.registrationQueue = new RegistrationQueueName();
@@ -99,7 +99,7 @@ class KeyRegistrationHandler {
     }
 
     Registration register(MailboxListener listener, RegistrationKey key) {
-        MailboxListenerRegistry.Registration registration = mailboxListenerRegistry.addListener(key,
listener);
+        LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key,
listener);
         if (registration.isFirstListener()) {
             registrationBinder.bind(key).block();
         }
@@ -122,7 +122,7 @@ class KeyRegistrationHandler {
         RegistrationKey registrationKey = routingKeyConverter.toRegistrationKey(routingKey);
         Event event = toEvent(delivery);
 
-        return mailboxListenerRegistry.getLocalMailboxListeners(registrationKey)
+        return localListenerRegistry.getLocalMailboxListeners(registrationKey)
             .filter(listener -> !isLocalSynchronousListeners(eventBusId, listener))
             .flatMap(listener -> Mono.fromRunnable(Throwing.runnable(() -> executeListener(listener,
event, registrationKey)))
                 .doOnError(e -> structuredLogger(event, registrationKey)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/LocalListenerRegistry.java
similarity index 91%
rename from mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
rename to mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/LocalListenerRegistry.java
index 35ebf3d..faa0d23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/LocalListenerRegistry.java
@@ -28,19 +28,20 @@ import java.util.function.Supplier;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableSet;
+
 import reactor.core.publisher.Flux;
 
-class MailboxListenerRegistry {
+class LocalListenerRegistry {
 
     interface RemovalStatus {
         boolean lastListenerRemoved();
     }
 
-    public static class Registration {
+    public static class LocalRegistration {
         private final boolean firstListener;
         private final Supplier<RemovalStatus> unregister;
 
-        public Registration(boolean firstListener, Supplier<RemovalStatus> unregister)
{
+        public LocalRegistration(boolean firstListener, Supplier<RemovalStatus> unregister)
{
             this.firstListener = firstListener;
             this.unregister = unregister;
         }
@@ -56,11 +57,11 @@ class MailboxListenerRegistry {
 
     private final ConcurrentHashMap<RegistrationKey, ImmutableSet<MailboxListener>>
listenersByKey;
 
-    MailboxListenerRegistry() {
+    LocalListenerRegistry() {
         this.listenersByKey = new ConcurrentHashMap<>();
     }
 
-    Registration addListener(RegistrationKey registrationKey, MailboxListener listener) {
+    LocalRegistration addListener(RegistrationKey registrationKey, MailboxListener listener)
{
         AtomicBoolean firstListener = new AtomicBoolean(false);
         listenersByKey.compute(registrationKey, (key, listeners) ->
             Optional.ofNullable(listeners)
@@ -70,7 +71,7 @@ class MailboxListenerRegistry {
                     return ImmutableSet.of(listener);
                 })
         );
-        return new Registration(firstListener.get(), () -> removeListener(registrationKey,
listener));
+        return new LocalRegistration(firstListener.get(), () -> removeListener(registrationKey,
listener));
     }
 
     private RemovalStatus removeListener(RegistrationKey registrationKey, MailboxListener
listener) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index c800850..c742107 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -30,6 +30,7 @@ import org.apache.james.metrics.api.MetricFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Connection;
+
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
@@ -75,10 +76,10 @@ public class RabbitMQEventBus implements EventBus {
         if (!isRunning && !isStopping) {
             sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)
                 .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel))));
-            MailboxListenerRegistry mailboxListenerRegistry = new MailboxListenerRegistry();
-            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer,
sender, connectionMono, routingKeyConverter, mailboxListenerRegistry, mailboxListenerExecutor);
+            LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer,
sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender,
connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
-            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, mailboxListenerRegistry,
mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry,
mailboxListenerExecutor);
 
             eventDispatcher.start();
             keyRegistrationHandler.start();
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
similarity index 89%
rename from mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
rename to mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
index 2d732dc..05a6a13 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
@@ -34,14 +34,14 @@ import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
-class MailboxListenerRegistryTest {
+class LocalListenerRegistryTest {
     private static final MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(TestId.of(42));
 
-    private MailboxListenerRegistry testee;
+    private LocalListenerRegistry testee;
 
     @BeforeEach
     void setUp() {
-        testee = new MailboxListenerRegistry();
+        testee = new LocalListenerRegistry();
     }
 
     @Test
@@ -75,7 +75,7 @@ class MailboxListenerRegistryTest {
         MailboxListener listener1 = event -> {};
         MailboxListener listener2 = event -> {};
         testee.addListener(KEY_1, listener1);
-        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1, listener2);
+        LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener2);
 
         registration.unregister();
 
@@ -105,7 +105,7 @@ class MailboxListenerRegistryTest {
         MailboxListener listener = event -> {};
         MailboxListener listener2 = event -> {};
 
-        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1, listener);
+        LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener);
         testee.addListener(KEY_1, listener2);
 
         assertThat(registration.unregister().lastListenerRemoved()).isFalse();
@@ -116,7 +116,7 @@ class MailboxListenerRegistryTest {
         MailboxListener listener = event -> {};
 
 
-        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1, listener);
+        LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener);
 
         assertThat(registration.unregister().lastListenerRemoved()).isTrue();
     }
@@ -167,7 +167,7 @@ class MailboxListenerRegistryTest {
         void getLocalMailboxListenersShouldReturnEmptyWhenRemoveAddedListener() throws Exception
{
             MailboxListener listener1 = event -> {};
 
-            MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener1);
+            LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener1);
 
             ConcurrentTestRunner.builder()
                 .operation(((threadNumber, operationNumber) -> registration.unregister()))
@@ -190,17 +190,17 @@ class MailboxListenerRegistryTest {
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, operationNumber) -> {
                     if (threadNumber % 3 == 0) {
-                        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener1);
+                        LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener1);
                         if (registration.isFirstListener()) {
                             firstListenerCount.incrementAndGet();
                         }
                     } else if (threadNumber % 3 == 1) {
-                        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener2);
+                        LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener2);
                         if (registration.isFirstListener()) {
                             firstListenerCount.incrementAndGet();
                         }
                     } else if (threadNumber % 3 == 2) {
-                        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener3);
+                        LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener3);
                         if (registration.isFirstListener()) {
                             firstListenerCount.incrementAndGet();
                         }
@@ -218,7 +218,7 @@ class MailboxListenerRegistryTest {
             MailboxListener listener1 = event -> {};
             AtomicInteger lastListenerRemoved = new AtomicInteger(0);
 
-            MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener1);
+            LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1,
listener1);
             ConcurrentTestRunner.builder()
                 .operation(((threadNumber, operationNumber) -> {
                     if (registration.unregister().lastListenerRemoved()) {
@@ -244,7 +244,7 @@ class MailboxListenerRegistryTest {
             testee.addListener(KEY_1, listener2);
             testee.addListener(KEY_1, listener3);
             testee.addListener(KEY_1, listener4);
-            MailboxListenerRegistry.Registration registration5 = testee.addListener(KEY_1,
listener5);
+            LocalListenerRegistry.LocalRegistration registration5 = testee.addListener(KEY_1,
listener5);
 
             Mono<List<MailboxListener>> listeners = testee.getLocalMailboxListeners(KEY_1)
                 .publishOn(Schedulers.elastic())


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message