james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [06/13] james-project git commit: JAMES-2545 implement a channel pool for rabbitmq cnx
Date Fri, 14 Sep 2018 03:19:30 GMT
JAMES-2545 implement a channel pool for rabbitmq cnx


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3313b53d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3313b53d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3313b53d

Branch: refs/heads/master
Commit: 3313b53d978e5ae52cccd725258bb9804bd4aeee
Parents: 01738f6
Author: Matthieu Baechler <matthieu@apache.org>
Authored: Mon Sep 10 18:11:41 2018 +0200
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Fri Sep 14 10:17:42 2018 +0700

----------------------------------------------------------------------
 backends-common/rabbitmq/pom.xml                |   5 +
 .../backend/rabbitmq/RabbitChannelPool.java     | 103 +++++++++++++++++++
 .../james/queue/rabbitmq/RabbitClient.java      |  43 ++++----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   |   3 +-
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  |   3 +-
 5 files changed, 137 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/backends-common/rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index d518753..9583901 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -63,6 +63,11 @@
             <artifactId>commons-configuration</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+            <version>2.6.0</version>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
new file mode 100644
index 0000000..c4a9fe6
--- /dev/null
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
@@ -0,0 +1,103 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.rabbitmq;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+
+public class RabbitChannelPool {
+
+    private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel>
{
+        private final Connection connection;
+
+        public ChannelBasePooledObjectFactory(Connection connection) {
+            this.connection = connection;
+        }
+
+        @Override
+        public Channel create() throws Exception {
+            return connection.createChannel();
+        }
+
+        @Override
+        public PooledObject<Channel> wrap(Channel obj) {
+            return new DefaultPooledObject<>(obj);
+        }
+    }
+
+    @FunctionalInterface
+    public interface RabbitFunction<T, E extends Throwable> {
+        T execute(Channel channel) throws E;
+    }
+
+    @FunctionalInterface
+    public interface RabbitConsumer<E extends Throwable> {
+        void execute(Channel channel) throws E;
+    }
+
+    private final ObjectPool<Channel> pool;
+
+    public RabbitChannelPool(Connection connection) {
+        pool = new GenericObjectPool<>(
+            new ChannelBasePooledObjectFactory(connection));
+    }
+
+    public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws
E {
+        Channel channel = borrowChannel();
+        try {
+            return f.execute(channel);
+        } finally {
+            returnChannel(channel);
+        }
+    }
+
+
+    public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E {
+        Channel channel = borrowChannel();
+        try {
+            f.execute(channel);
+        } finally {
+            returnChannel(channel);
+        }
+    }
+
+    private Channel borrowChannel() {
+        try {
+            return pool.borrowObject();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void returnChannel(Channel channel) {
+        try {
+            pool.returnObject(channel);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
index d5a945d..ae6b1be 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
@@ -22,11 +22,11 @@ package org.apache.james.queue.rabbitmq;
 import java.io.IOException;
 import java.util.Optional;
 
+import org.apache.james.backend.rabbitmq.RabbitChannelPool;
 import org.apache.james.queue.api.MailQueue;
 
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.GetResponse;
 
 class RabbitClient {
@@ -39,35 +39,42 @@ class RabbitClient {
     private static final ImmutableMap<String, Object> NO_ARGUMENTS = ImmutableMap.of();
     private static final String ROUTING_KEY = "";
 
-    private final Channel channel;
+    private final RabbitChannelPool channelPool;
 
-    RabbitClient(Channel channel) {
-        this.channel = channel;
+    RabbitClient(RabbitChannelPool channelPool) {
+        this.channelPool = channelPool;
     }
 
     void attemptQueueCreation(MailQueueName name) {
-        try {
-            channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE);
-            channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE,
!AUTO_DELETE, NO_ARGUMENTS);
-            channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(),
ROUTING_KEY);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        channelPool.execute(channel -> {
+            try {
+                channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct",
DURABLE);
+                channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE,
!AUTO_DELETE, NO_ARGUMENTS);
+                channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(),
ROUTING_KEY);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException
{
-        try {
-            channel.basicPublish(name.toRabbitExchangeName().asString(), ROUTING_KEY, new
AMQP.BasicProperties(), message);
-        } catch (IOException e) {
-            throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e);
-        }
+        channelPool.execute(channel -> {
+            try {
+                channel.basicPublish(name.toRabbitExchangeName().asString(), ROUTING_KEY,
new AMQP.BasicProperties(), message);
+            } catch (IOException e) {
+                throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e);
+            }
+        });
     }
 
     void ack(long deliveryTag) throws IOException {
-        channel.basicAck(deliveryTag, !MULTIPLE);
+        RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicAck(deliveryTag,
!MULTIPLE);
+        channelPool.execute(consumer);
     }
 
     Optional<GetResponse> poll(MailQueueName name) throws IOException {
-        return Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK));
+        RabbitChannelPool.RabbitFunction<Optional<GetResponse>, IOException>
f = channel ->
+            Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK));
+        return channelPool.execute(f);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index aaf57f7..c2d5c07 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -28,6 +28,7 @@ import javax.mail.internet.MimeMessage;
 
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backend.rabbitmq.RabbitChannelPool;
 import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraExtension;
@@ -70,7 +71,7 @@ public class RabbitMQMailQueueTest implements MailQueueContract {
             .setPort(rabbitMQ.getAdminPort())
             .build();
 
-        RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel());
+        RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQ.connectionFactory().newConnection()));
         RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore,
BLOB_ID_FACTORY);
         RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri,
new RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);

http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index cf1cb88..bbb4734 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -28,6 +28,7 @@ import javax.mail.internet.MimeMessage;
 
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backend.rabbitmq.RabbitChannelPool;
 import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraExtension;
@@ -70,7 +71,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             .setPort(rabbitMQ.getAdminPort())
             .build();
 
-        RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel());
+        RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQ.connectionFactory().newConnection()));
         RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore,
BLOB_ID_FACTORY);
         RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri,
new RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message