james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [1/8] james-project git commit: JAMES-2544 RabbitMQ should register gauge size
Date Wed, 03 Oct 2018 09:45:05 GMT
Repository: james-project
Updated Branches:
  refs/heads/master 0ca09252a -> 8e9a0fd35


JAMES-2544 RabbitMQ should register gauge size

- RabbitMQMailQueueFactory should have locally storage, each time the
factory `getQueue`, it attempt to create a new `RabbitMQMailQueue` instance
that is redundant and making duplicating registering gauge at the `RabbitMQMailQueue` constructor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/62cd92fe
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/62cd92fe
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/62cd92fe

Branch: refs/heads/master
Commit: 62cd92feda6cbad999d2d5225300a9c6698c02b7
Parents: 0ca0925
Author: duc <dtran@linagora.com>
Authored: Wed Sep 19 11:05:59 2018 +0700
Committer: duc <dtran@linagora.com>
Committed: Wed Oct 3 12:06:00 2018 +0700

----------------------------------------------------------------------
 .../james/queue/rabbitmq/RabbitMQMailQueue.java | 20 ++++++++---
 .../rabbitmq/RabbitMQMailQueueFactory.java      | 23 +++++++++----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   |  1 +
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  | 35 +++++++++++++++++++-
 4 files changed, 67 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/62cd92fe/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 443b94f..fe6487a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -29,6 +29,7 @@ import javax.mail.internet.MimeMessage;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
@@ -46,6 +47,7 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
 
     static class Factory {
         private final MetricFactory metricFactory;
+        private final GaugeRegistry gaugeRegistry;
         private final RabbitClient rabbitClient;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
@@ -54,12 +56,14 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
         private final Clock clock;
 
         @Inject
-        @VisibleForTesting Factory(MetricFactory metricFactory, RabbitClient rabbitClient,
+        @VisibleForTesting Factory(MetricFactory metricFactory, GaugeRegistry gaugeRegistry,
+                                   RabbitClient rabbitClient,
                                    Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
                                    BlobId.Factory blobIdFactory,
                                    MailQueueView mailQueueView,
                                    Clock clock) {
             this.metricFactory = metricFactory;
+            this.gaugeRegistry = gaugeRegistry;
             this.rabbitClient = rabbitClient;
             this.mimeMessageStore = mimeMessageStore;
             this.mailQueueView = mailQueueView;
@@ -71,28 +75,36 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
         RabbitMQMailQueue create(MailQueueName mailQueueName) {
             mailQueueView.initialize(mailQueueName);
 
-            return new RabbitMQMailQueue(metricFactory, mailQueueName,
+            return new RabbitMQMailQueue(
+                metricFactory,
+                mailQueueName,
+                gaugeRegistry,
                 new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer,
                     metricFactory, mailQueueView, clock),
                 new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer,
-                    metricFactory, mailQueueView), mailQueueView);
+                    metricFactory, mailQueueView),
+                mailQueueView);
         }
     }
 
     private final MailQueueName name;
     private final MetricFactory metricFactory;
+    private final GaugeRegistry gaugeRegistry;
     private final Enqueuer enqueuer;
     private final Dequeuer dequeuer;
     private final MailQueueView mailQueueView;
 
     RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name,
-                      Enqueuer enqueuer, Dequeuer dequeuer,
+                      GaugeRegistry gaugeRegistry, Enqueuer enqueuer, Dequeuer dequeuer,
                       MailQueueView mailQueueView) {
         this.metricFactory = metricFactory;
         this.name = name;
+        this.gaugeRegistry = gaugeRegistry;
         this.enqueuer = enqueuer;
         this.dequeuer = dequeuer;
         this.mailQueueView = mailQueueView;
+
+        this.gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + name.asString(), this::getSize);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/62cd92fe/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 784cf50..5d0e629 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -19,28 +19,33 @@
 
 package org.apache.james.queue.rabbitmq;
 
-import java.io.IOException;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.inject.Inject;
 
 import org.apache.james.queue.api.MailQueueFactory;
 
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 
 public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>
{
     private final RabbitClient rabbitClient;
     private final RabbitMQManagementApi mqManagementApi;
     private final RabbitMQMailQueue.Factory mailQueueFactory;
 
+    private final ConcurrentHashMap<MailQueueName, RabbitMQMailQueue> mailQueueStoreLocal;
+
     @VisibleForTesting
     @Inject
-    RabbitMQMailQueueFactory(RabbitClient rabbitClient, RabbitMQManagementApi mqManagementApi,
RabbitMQMailQueue.Factory mailQueueFactory) throws IOException {
+    RabbitMQMailQueueFactory(RabbitClient rabbitClient,
+                             RabbitMQManagementApi mqManagementApi,
+                             RabbitMQMailQueue.Factory mailQueueFactory) {
         this.rabbitClient = rabbitClient;
         this.mqManagementApi = mqManagementApi;
         this.mailQueueFactory = mailQueueFactory;
+        this.mailQueueStoreLocal = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -58,19 +63,23 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     @Override
     public Set<RabbitMQMailQueue> listCreatedMailQueues() {
         return mqManagementApi.listCreatedMailQueueNames()
-            .map(mailQueueFactory::create)
-            .collect(Guavate.toImmutableSet());
+            .map(this::getOrElseCreateLocally)
+            .collect(ImmutableSet.toImmutableSet());
     }
 
     private RabbitMQMailQueue attemptQueueCreation(MailQueueName mailQueueName) {
         rabbitClient.attemptQueueCreation(mailQueueName);
-        return mailQueueFactory.create(mailQueueName);
+        return getOrElseCreateLocally(mailQueueName);
     }
 
     private Optional<RabbitMQMailQueue> getQueue(MailQueueName name) {
         return mqManagementApi.listCreatedMailQueueNames()
             .filter(name::equals)
-            .map(mailQueueFactory::create)
+            .map(this::getOrElseCreateLocally)
             .findFirst();
     }
+
+    private RabbitMQMailQueue getOrElseCreateLocally(MailQueueName name) {
+        return mailQueueStoreLocal.computeIfAbsent(name, mailQueueFactory::create);
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/62cd92fe/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 109e47e..2d7c2b9 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -131,6 +131,7 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract,
MailQ
         RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory));
         RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(
             metricTestSystem.getSpyMetricFactory(),
+            metricTestSystem.getSpyGaugeRegistry(),
             rabbitClient,
             mimeMessageStore,
             BLOB_ID_FACTORY,

http://git-wip-us.apache.org/repos/asf/james-project/blob/62cd92fe/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index aa2d553..0a1e7d1 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -20,12 +20,16 @@
 package org.apache.james.queue.rabbitmq;
 
 import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.time.Clock;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.mail.internet.MimeMessage;
@@ -38,11 +42,14 @@ import org.apache.james.backend.rabbitmq.RabbitMQExtension;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueFactoryContract;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
@@ -69,7 +76,14 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
         RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory));
-        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(),
rabbitClient, mimeMessageStore, BLOB_ID_FACTORY, mailQueueView, Clock.systemUTC());
+        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(
+            new NoopMetricFactory(),
+            new NoopGaugeRegistry(),
+            rabbitClient,
+            mimeMessageStore,
+            BLOB_ID_FACTORY,
+            mailQueueView,
+            Clock.systemUTC());
         RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitMQConfiguration);
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
     }
@@ -78,4 +92,23 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
     public MailQueueFactory<RabbitMQMailQueue> getMailQueueFactory() {
         return mailQueueFactory;
     }
+
+    @Test
+    void createQueueShouldReturnTheSameInstanceWhenParallelCreateSameQueueName() throws Exception
{
+        Set<RabbitMQMailQueue> createdRabbitMQMailQueues =  ConcurrentHashMap.newKeySet();
+
+        ConcurrentTestRunner.builder()
+            .threadCount(100)
+            .operationCount(10)
+            .build((threadNumber, operationNumber) ->
+                createdRabbitMQMailQueues.add(mailQueueFactory.createQueue("spool")))
+            .run()
+            .awaitTermination(10, TimeUnit.MINUTES);
+
+        assertThat(mailQueueFactory.listCreatedMailQueues())
+            .hasSize(1)
+            .isEqualTo(createdRabbitMQMailQueues)
+            .extracting(RabbitMQMailQueue::getName)
+            .hasOnlyOneElementSatisfying(queueName -> assertThat(queueName).isEqualTo("spool"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message