james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From matth...@apache.org
Subject [3/4] james-project git commit: JAMES-2659 re-implement guard logic for RabbitEventBus start/stop
Date Fri, 08 Feb 2019 09:23:34 GMT
JAMES-2659 re-implement guard logic for RabbitEventBus start/stop


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/52ec5dbb
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/52ec5dbb
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/52ec5dbb

Branch: refs/heads/master
Commit: 52ec5dbb2065d17dc3ffa04f864c32332609e282
Parents: d086637
Author: Matthieu Baechler <matthieu@apache.org>
Authored: Thu Feb 7 17:07:21 2019 +0100
Committer: Matthieu Baechler <matthieu@apache.org>
Committed: Fri Feb 8 10:19:55 2019 +0100

----------------------------------------------------------------------
 .../james/mailbox/events/RabbitMQEventBus.java  | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/52ec5dbb/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 2a9a07d..c800850 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -20,7 +20,6 @@
 package org.apache.james.mailbox.events;
 
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -31,7 +30,6 @@ import org.apache.james.metrics.api.MetricFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Connection;
-
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
@@ -44,13 +42,14 @@ public class RabbitMQEventBus implements EventBus {
 
     private final Mono<Connection> connectionMono;
     private final EventSerializer eventSerializer;
-    private final AtomicBoolean isRunning;
     private final RoutingKeyConverter routingKeyConverter;
     private final RetryBackoffConfiguration retryBackoff;
     private final EventBusId eventBusId;
     private final EventDeadLetters eventDeadLetters;
     private final MailboxListenerExecutor mailboxListenerExecutor;
 
+    private volatile boolean isRunning;
+    private volatile boolean isStopping;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
     private EventDispatcher eventDispatcher;
@@ -68,11 +67,12 @@ public class RabbitMQEventBus implements EventBus {
         this.routingKeyConverter = routingKeyConverter;
         this.retryBackoff = retryBackoff;
         this.eventDeadLetters = eventDeadLetters;
-        this.isRunning = new AtomicBoolean(false);
+        this.isRunning = false;
+        this.isStopping = false;
     }
 
     public void start() {
-        if (!isRunning.get()) {
+        if (!isRunning && !isStopping) {
             sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)
                 .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel))));
             MailboxListenerRegistry mailboxListenerRegistry = new MailboxListenerRegistry();
@@ -82,35 +82,45 @@ public class RabbitMQEventBus implements EventBus {
 
             eventDispatcher.start();
             keyRegistrationHandler.start();
-            isRunning.set(true);
+            isRunning = true;
         }
     }
 
     @PreDestroy
     public void stop() {
-        if (isRunning.get()) {
+        if (isRunning && !isStopping) {
+            isStopping = true;
+            isRunning = false;
             groupRegistrationHandler.stop();
             keyRegistrationHandler.stop();
             sender.close();
-            isRunning.set(false);
         }
     }
 
     @Override
     public Registration register(MailboxListener listener, RegistrationKey key) {
-        return keyRegistrationHandler.register(listener, key);
+        if (isRunning) {
+            return keyRegistrationHandler.register(listener, key);
+        }
+        throw new IllegalStateException("Event Bus is not running");
     }
 
     @Override
     public Registration register(MailboxListener listener, Group group) {
-        return groupRegistrationHandler.register(listener, group);
+        if (isRunning) {
+            return groupRegistrationHandler.register(listener, group);
+        }
+        throw new IllegalStateException("Event Bus is not running");
     }
 
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
-        if (!event.isNoop()) {
-            return eventDispatcher.dispatch(event, key);
+        if (isRunning) {
+            if (!event.isNoop()) {
+                return eventDispatcher.dispatch(event, key);
+            }
+            return Mono.empty();
         }
-        return Mono.empty();
+        throw new IllegalStateException("Event Bus is not running");
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message