james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [james-project] 07/12: JAMES-2659 implement a simpler API for MailboxListenerRegistry
Date Wed, 13 Feb 2019 08:18:14 GMT
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c038bc28ab2fe73cfbaa16a119fb681beebe895d
Author: Matthieu Baechler <matthieu@apache.org>
AuthorDate: Fri Feb 8 13:40:43 2019 +0100

    JAMES-2659 implement a simpler API for MailboxListenerRegistry
---
 .../mailbox/events/KeyRegistrationHandler.java     |  16 +--
 .../mailbox/events/MailboxListenerRegistry.java    |  47 ++++++--
 .../events/MailboxListenerRegistryTest.java        | 122 ++++++++++-----------
 3 files changed, 106 insertions(+), 79 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index f0e2acf..dac9d93 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -99,13 +99,15 @@ class KeyRegistrationHandler {
     }
 
     Registration register(MailboxListener listener, RegistrationKey key) {
-        Runnable bindIfEmpty = () -> registrationBinder.bind(key).block();
-        Runnable unbindIfEmpty = () -> registrationBinder.unbind(key).block();
-        Runnable unregister = () -> mailboxListenerRegistry.removeListener(key, listener,
unbindIfEmpty);
-
-        KeyRegistration keyRegistration = new KeyRegistration(unregister);
-        mailboxListenerRegistry.addListener(key, listener, bindIfEmpty);
-        return keyRegistration;
+        MailboxListenerRegistry.Registration registration = mailboxListenerRegistry.addListener(key,
listener);
+        if (registration.isFirstListener()) {
+            registrationBinder.bind(key).block();
+        }
+        return new KeyRegistration(() -> {
+            if (registration.unregister().lastListenerRemoved()) {
+                registrationBinder.unbind(key).block();
+            }
+        });
     }
 
     private Mono<Void> handleDelivery(Delivery delivery) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
index cd5cc52..35ebf3d 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
@@ -23,44 +23,77 @@ import static com.google.common.base.Predicates.not;
 
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableSet;
 import reactor.core.publisher.Flux;
 
 class MailboxListenerRegistry {
+
+    interface RemovalStatus {
+        boolean lastListenerRemoved();
+    }
+
+    public static class Registration {
+        private final boolean firstListener;
+        private final Supplier<RemovalStatus> unregister;
+
+        public Registration(boolean firstListener, Supplier<RemovalStatus> unregister)
{
+            this.firstListener = firstListener;
+            this.unregister = unregister;
+        }
+
+        public boolean isFirstListener() {
+            return firstListener;
+        }
+
+        public RemovalStatus unregister() {
+            return unregister.get();
+        }
+    }
+
     private final ConcurrentHashMap<RegistrationKey, ImmutableSet<MailboxListener>>
listenersByKey;
 
     MailboxListenerRegistry() {
         this.listenersByKey = new ConcurrentHashMap<>();
     }
 
-    void addListener(RegistrationKey registrationKey, MailboxListener listener, Runnable
runIfEmpty) {
+    Registration addListener(RegistrationKey registrationKey, MailboxListener listener) {
+        AtomicBoolean firstListener = new AtomicBoolean(false);
         listenersByKey.compute(registrationKey, (key, listeners) ->
             Optional.ofNullable(listeners)
                 .map(set -> ImmutableSet.<MailboxListener>builder().addAll(set).add(listener).build())
                 .orElseGet(() -> {
-                    runIfEmpty.run();
+                    firstListener.set(true);
                     return ImmutableSet.of(listener);
                 })
         );
+        return new Registration(firstListener.get(), () -> removeListener(registrationKey,
listener));
     }
 
-    void removeListener(RegistrationKey registrationKey, MailboxListener listener, Runnable
runIfEmpty) {
+    private RemovalStatus removeListener(RegistrationKey registrationKey, MailboxListener
listener) {
+        AtomicBoolean lastListenerRemoved = new AtomicBoolean(false);
         listenersByKey.compute(registrationKey, (key, listeners) -> {
             boolean listenersContainRequested = Optional.ofNullable(listeners).orElse(ImmutableSet.of()).contains(listener);
             if (listenersContainRequested) {
-                return removeListenerFromSet(listener, runIfEmpty, listeners);
+                ImmutableSet<MailboxListener> remainingListeners = removeListenerFromSet(listener,
listeners);
+                if (remainingListeners.isEmpty()) {
+                    lastListenerRemoved.set(true);
+                    return null;
+                }
+                return remainingListeners;
             }
             return listeners;
         });
+        return lastListenerRemoved::get;
     }
 
-    private ImmutableSet<MailboxListener> removeListenerFromSet(MailboxListener listener,
Runnable runIfEmpty, ImmutableSet<MailboxListener> listeners) {
+    private ImmutableSet<MailboxListener> removeListenerFromSet(MailboxListener listener,
ImmutableSet<MailboxListener> listeners) {
         ImmutableSet<MailboxListener> remainingListeners = listeners.stream().filter(not(listener::equals)).collect(Guavate.toImmutableSet());
         if (remainingListeners.isEmpty()) {
-            runIfEmpty.run();
-            return null;
+            return ImmutableSet.of();
         }
         return remainingListeners;
     }
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
index 6320274..2d732dc 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.james.mailbox.model.TestId;
@@ -37,8 +36,6 @@ import reactor.core.scheduler.Schedulers;
 
 class MailboxListenerRegistryTest {
     private static final MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(TestId.of(42));
-    private static final Runnable NOOP = () -> {
-    };
 
     private MailboxListenerRegistry testee;
 
@@ -56,7 +53,7 @@ class MailboxListenerRegistryTest {
     @Test
     void getLocalMailboxListenersShouldReturnPreviouslyAddedListener() {
         MailboxListener listener = event -> {};
-        testee.addListener(KEY_1, listener, NOOP);
+        testee.addListener(KEY_1, listener);
 
         assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
             .containsOnly(listener);
@@ -66,8 +63,8 @@ class MailboxListenerRegistryTest {
     void getLocalMailboxListenersShouldReturnPreviouslyAddedListeners() {
         MailboxListener listener1 = event -> {};
         MailboxListener listener2 = event -> {};
-        testee.addListener(KEY_1, listener1, NOOP);
-        testee.addListener(KEY_1, listener2, NOOP);
+        testee.addListener(KEY_1, listener1);
+        testee.addListener(KEY_1, listener2);
 
         assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
             .containsOnly(listener1, listener2);
@@ -77,68 +74,51 @@ class MailboxListenerRegistryTest {
     void getLocalMailboxListenersShouldNotReturnRemovedListeners() {
         MailboxListener listener1 = event -> {};
         MailboxListener listener2 = event -> {};
-        testee.addListener(KEY_1, listener1, NOOP);
-        testee.addListener(KEY_1, listener2, NOOP);
+        testee.addListener(KEY_1, listener1);
+        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1, listener2);
 
-        testee.removeListener(KEY_1, listener2, NOOP);
+        registration.unregister();
 
         assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
             .containsOnly(listener1);
     }
 
     @Test
-    void addListenerShouldRunTaskWhenNoPreviouslyRegisteredListeners() {
+    void addListenerShouldReturnFirstListenerWhenNoPreviouslyRegisteredListeners() {
         MailboxListener listener = event -> {};
 
-        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
-        testee.addListener(KEY_1, listener, () -> atomicBoolean.set(true));
-
-        assertThat(atomicBoolean).isTrue();
-    }
-
-    @Test
-    void addListenerShouldNotRunTaskWhenPreviouslyRegisteredListeners() {
-        MailboxListener listener = event -> {};
-
-        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
-        testee.addListener(KEY_1, listener, NOOP);
-        testee.addListener(KEY_1, listener, () -> atomicBoolean.set(true));
-
-        assertThat(atomicBoolean).isFalse();
+        assertThat(testee.addListener(KEY_1, listener).isFirstListener()).isTrue();
     }
 
     @Test
-    void removeListenerShouldNotRunTaskWhenNoListener() {
+    void addListenerShouldNotReturnFirstListenerWhenPreviouslyRegisteredListeners() {
         MailboxListener listener = event -> {};
+        MailboxListener listener2 = event -> {};
 
-        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
-        testee.removeListener(KEY_1, listener, () -> atomicBoolean.set(true));
+        testee.addListener(KEY_1, listener);
 
-        assertThat(atomicBoolean).isFalse();
+        assertThat(testee.addListener(KEY_1, listener2).isFirstListener()).isFalse();
     }
 
     @Test
-    void removeListenerShouldNotRunTaskWhenSeveralListener() {
+    void removeListenerShouldNotReturnLastListenerRemovedWhenSeveralListener() {
         MailboxListener listener = event -> {};
         MailboxListener listener2 = event -> {};
 
-        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
-        testee.addListener(KEY_1, listener, NOOP);
-        testee.addListener(KEY_1, listener2, NOOP);
-        testee.removeListener(KEY_1, listener, () -> atomicBoolean.set(true));
+        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1, listener);
+        testee.addListener(KEY_1, listener2);
 
-        assertThat(atomicBoolean).isFalse();
+        assertThat(registration.unregister().lastListenerRemoved()).isFalse();
     }
 
     @Test
-    void removeListenerShouldRunTaskWhenOneListener() {
+    void removeListenerShouldReturnLastListenerRemovedWhenOneListener() {
         MailboxListener listener = event -> {};
 
-        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
-        testee.addListener(KEY_1, listener, NOOP);
-        testee.removeListener(KEY_1, listener, () -> atomicBoolean.set(true));
 
-        assertThat(atomicBoolean).isTrue();
+        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1, listener);
+
+        assertThat(registration.unregister().lastListenerRemoved()).isTrue();
     }
 
     @Nested
@@ -150,7 +130,7 @@ class MailboxListenerRegistryTest {
             MailboxListener listener = event -> {};
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> testee.addListener(KEY_1,
listener, NOOP))
+                .operation((threadNumber, operationNumber) -> testee.addListener(KEY_1,
listener))
                 .threadCount(10)
                 .operationCount(10)
                 .runSuccessfullyWithin(ONE_SECOND);
@@ -168,11 +148,11 @@ class MailboxListenerRegistryTest {
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, operationNumber) -> {
                     if (threadNumber % 3 == 0) {
-                        testee.addListener(KEY_1, listener1, NOOP);
+                        testee.addListener(KEY_1, listener1);
                     } else if (threadNumber % 3 == 1) {
-                        testee.addListener(KEY_1, listener2, NOOP);
+                        testee.addListener(KEY_1, listener2);
                     } else if (threadNumber % 3 == 2) {
-                        testee.addListener(KEY_1, listener3, NOOP);
+                        testee.addListener(KEY_1, listener3);
                     }
                 })
                 .threadCount(6)
@@ -187,11 +167,10 @@ class MailboxListenerRegistryTest {
         void getLocalMailboxListenersShouldReturnEmptyWhenRemoveAddedListener() throws Exception
{
             MailboxListener listener1 = event -> {};
 
-            testee.addListener(KEY_1, listener1, NOOP);
+            MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener1);
 
             ConcurrentTestRunner.builder()
-                .operation(((threadNumber, operationNumber) ->
-                    testee.removeListener(KEY_1, listener1, NOOP)))
+                .operation(((threadNumber, operationNumber) -> registration.unregister()))
                 .threadCount(10)
                 .operationCount(10)
                 .runSuccessfullyWithin(ONE_SECOND);
@@ -201,65 +180,78 @@ class MailboxListenerRegistryTest {
         }
 
         @Test
-        void addListenerOnlyRunTaskOnceForEmptyRegistry() throws Exception {
+        void addListenerOnlyReturnIsFirstListenerForEmptyRegistry() throws Exception {
             MailboxListener listener1 = event -> {};
             MailboxListener listener2 = event -> {};
             MailboxListener listener3 = event -> {};
 
-            AtomicInteger runIfEmptyCount = new AtomicInteger(0);
+            AtomicInteger firstListenerCount = new AtomicInteger(0);
 
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, operationNumber) -> {
                     if (threadNumber % 3 == 0) {
-                        testee.addListener(KEY_1, listener1, runIfEmptyCount::incrementAndGet);
+                        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener1);
+                        if (registration.isFirstListener()) {
+                            firstListenerCount.incrementAndGet();
+                        }
                     } else if (threadNumber % 3 == 1) {
-                        testee.addListener(KEY_1, listener2, runIfEmptyCount::incrementAndGet);
+                        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener2);
+                        if (registration.isFirstListener()) {
+                            firstListenerCount.incrementAndGet();
+                        }
                     } else if (threadNumber % 3 == 2) {
-                        testee.addListener(KEY_1, listener3, runIfEmptyCount::incrementAndGet);
+                        MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener3);
+                        if (registration.isFirstListener()) {
+                            firstListenerCount.incrementAndGet();
+                        }
                     }
                 })
                 .threadCount(6)
                 .operationCount(10)
                 .runSuccessfullyWithin(ONE_SECOND);
 
-            assertThat(runIfEmptyCount.get()).isEqualTo(1);
+            assertThat(firstListenerCount.get()).isEqualTo(1);
         }
 
         @Test
-        void removeListenerOnlyRunTaskOnceForEmptyRegistry() throws Exception {
+        void removeListenerOnlyReturnLastListenerRemovedForEmptyRegistry() throws Exception
{
             MailboxListener listener1 = event -> {};
-            AtomicInteger runIfEmptyCount = new AtomicInteger(0);
+            AtomicInteger lastListenerRemoved = new AtomicInteger(0);
 
-            testee.addListener(KEY_1, listener1, NOOP);
+            MailboxListenerRegistry.Registration registration = testee.addListener(KEY_1,
listener1);
             ConcurrentTestRunner.builder()
-                .operation(((threadNumber, operationNumber) -> testee.removeListener(KEY_1,
listener1, runIfEmptyCount::incrementAndGet)))
+                .operation(((threadNumber, operationNumber) -> {
+                    if (registration.unregister().lastListenerRemoved()) {
+                        lastListenerRemoved.incrementAndGet();
+                    }
+                }))
                 .threadCount(10)
                 .operationCount(10)
                 .runSuccessfullyWithin(ONE_SECOND);
 
-            assertThat(runIfEmptyCount.get()).isEqualTo(1);
+            assertThat(lastListenerRemoved.get()).isEqualTo(1);
         }
 
         @Test
-        void iterationShouldPerformOnASnapshotOfListenersSet() throws Exception {
+        void iterationShouldPerformOnASnapshotOfListenersSet() {
             MailboxListener listener1 = event -> {};
             MailboxListener listener2 = event -> {};
             MailboxListener listener3 = event -> {};
             MailboxListener listener4 = event -> {};
             MailboxListener listener5 = event -> {};
 
-            testee.addListener(KEY_1, listener1, NOOP);
-            testee.addListener(KEY_1, listener2, NOOP);
-            testee.addListener(KEY_1, listener3, NOOP);
-            testee.addListener(KEY_1, listener4, NOOP);
-            testee.addListener(KEY_1, listener5, NOOP);
+            testee.addListener(KEY_1, listener1);
+            testee.addListener(KEY_1, listener2);
+            testee.addListener(KEY_1, listener3);
+            testee.addListener(KEY_1, listener4);
+            MailboxListenerRegistry.Registration registration5 = testee.addListener(KEY_1,
listener5);
 
             Mono<List<MailboxListener>> listeners = testee.getLocalMailboxListeners(KEY_1)
                 .publishOn(Schedulers.elastic())
                 .delayElements(Duration.ofMillis(100))
                 .collectList();
 
-            testee.removeListener(KEY_1, listener5, NOOP);
+            registration5.unregister();
 
             assertThat(listeners.block(Duration.ofSeconds(10))).hasSize(5);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message