james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 03/03: JAMES-2813 Introduce ReactorRabbitMQChannelPool.createSender
Date Tue, 03 Sep 2019 15:30:28 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0b7669b9b85eb2bcf81c0963410555979c091d47
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Wed Aug 28 10:00:25 2019 +0200

    JAMES-2813 Introduce ReactorRabbitMQChannelPool.createSender
---
 .../backend/rabbitmq/ReactorRabbitMQChannelPool.java      | 15 +++++++++++++++
 .../org/apache/james/mailbox/events/RabbitMQEventBus.java | 11 ++---------
 .../distributed/RabbitMQTerminationSubscriber.java        |  8 +-------
 .../task/eventsourcing/distributed/RabbitMQWorkQueue.java |  9 +--------
 4 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java
index f05a307..814e70a 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -29,9 +29,11 @@ import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -39,6 +41,9 @@ import reactor.core.publisher.Mono;
 import reactor.core.publisher.SignalType;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.ChannelPool;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
 
 public class ReactorRabbitMQChannelPool implements ChannelPool {
 
@@ -87,10 +92,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
 
     private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
 
+    private final Mono<Connection> connectionMono;
     private final GenericObjectPool<Channel> pool;
     private final ConcurrentSkipListSet<Channel> borrowedChannels;
 
     public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize)
{
+        this.connectionMono = connectionMono;
         ChannelFactory channelFactory = new ChannelFactory(connectionMono);
 
         GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
@@ -120,6 +127,14 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
         };
     }
 
+    public Sender createSender() {
+       return RabbitFlux.createSender(new SenderOptions()
+           .connectionMono(connectionMono)
+           .channelPool(this)
+           .resourceManagementChannelMono(
+               connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
+    }
+
     private void invalidateObject(Channel channel) {
         try {
             pool.invalidateObject(channel);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index bd53e93..36795c6 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -30,16 +30,10 @@ import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.MetricFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
-import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
-
 import reactor.core.publisher.Mono;
-import reactor.rabbitmq.ChannelPool;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
 
 public class RabbitMQEventBus implements EventBus, Startable {
     private static final int MAX_CHANNELS_NUMBER = 5;
@@ -58,7 +52,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     private volatile boolean isRunning;
     private volatile boolean isStopping;
-    private ChannelPool channelPool;
+    private ReactorRabbitMQChannelPool channelPool;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
     EventDispatcher eventDispatcher;
@@ -84,8 +78,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
         if (!isRunning && !isStopping) {
             this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
 
-            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono).channelPool(channelPool)
-                .resourceManagementChannelMono(connectionMono.map(Throwing.<Connection,
Channel>function(Connection::createChannel).sneakyThrow()).cache()));
+            sender = channelPool.createSender();
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer,
sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender,
connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index a3a8bb1..89d6e96 100644
--- a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
@@ -56,7 +55,6 @@ import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
 
 public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable
{
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
@@ -83,11 +81,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber,
Sta
     }
 
     public void start() {
-        Sender sender = RabbitFlux.createSender(new SenderOptions()
-            .connectionMono(connectionMono)
-            .channelPool(channelPool)
-            .resourceManagementChannelMono(
-                connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
+        Sender sender = channelPool.createSender();
 
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 69cdf16..e954c31 100644
--- a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
@@ -51,10 +50,8 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
 
 public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
@@ -80,11 +77,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     public void start() {
-        sender = RabbitFlux.createSender(new SenderOptions()
-            .connectionMono(connectionMono)
-            .channelPool(channelPool)
-            .resourceManagementChannelMono(
-                connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
+        sender = channelPool.createSender();
 
         receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message