james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [james-project] 05/12: JAMES-2659 simplify call to local listeners
Date Wed, 13 Feb 2019 08:18:12 GMT
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5edbc264ff63f056a0b3fcf6d1073919377ddcc1
Author: Matthieu Baechler <matthieu@apache.org>
AuthorDate: Thu Feb 7 15:21:02 2019 +0100

    JAMES-2659 simplify call to local listeners
---
 .../james/mailbox/events/EventDispatcher.java      | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index d7733b5..2f90983 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Set;
 import java.util.stream.Stream;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.events.RoutingKeyConverter.RoutingKey;
 import org.apache.james.util.MDCBuilder;
@@ -37,8 +36,8 @@ import org.apache.james.util.StructuredLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.rabbitmq.client.AMQP;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -47,6 +46,7 @@ import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.Sender;
+import reactor.util.function.Tuples;
 
 class EventDispatcher {
     private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
@@ -87,20 +87,26 @@ class EventDispatcher {
         return Flux.fromIterable(keys)
             .subscribeOn(Schedulers.elastic())
             .flatMap(key -> mailboxListenerRegistry.getLocalMailboxListeners(key)
-                .map(listener -> Pair.of(key, listener)))
-            .filter(pair -> pair.getRight().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
-            .flatMap(pair -> Mono.fromRunnable(Throwing.runnable(() -> executeListener(event,
pair.getRight(), pair.getLeft())))
-                .doOnError(e -> structuredLogger(event, keys)
-                    .log(logger -> logger.error("Exception happens when dispatching event",
e)))
-                .onErrorResume(e -> Mono.empty()))
+                .map(listener -> Tuples.of(key, listener)))
+            .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
+            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
             .then();
     }
 
-    private void executeListener(Event event, MailboxListener mailboxListener, RegistrationKey
registrationKey) throws Exception {
-        mailboxListenerExecutor.execute(mailboxListener,
-            MDCBuilder.create()
-                .addContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, registrationKey),
-            event);
+    private Mono<Void> executeListener(Event event, MailboxListener mailboxListener,
RegistrationKey registrationKey) {
+        return Mono.from((sink) -> {
+            try {
+                mailboxListenerExecutor.execute(mailboxListener,
+                    MDCBuilder.create()
+                        .addContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, registrationKey),
+                    event);
+            } catch (Exception e) {
+                structuredLogger(event, ImmutableSet.of(registrationKey))
+                    .log(logger -> logger.error("Exception happens when dispatching event",
e));
+            }
+            sink.onComplete();
+        });
+
     }
 
     private StructuredLogger structuredLogger(Event event, Set<RegistrationKey> keys)
{


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message