james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [14/19] james-project git commit: JAMES-2541 Implement the MailQueue API on top of RabbitMQ
Date Mon, 10 Sep 2018 10:34:17 GMT
JAMES-2541 Implement the MailQueue API on top of RabbitMQ

Handle all fields of a mail


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/197ea1ed
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/197ea1ed
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/197ea1ed

Branch: refs/heads/master
Commit: 197ea1ed45738b6e04a09187997485051ae4d749
Parents: 5f68dd5
Author: Benoit Tellier <btellier@linagora.com>
Authored: Wed Sep 5 16:13:43 2018 +0700
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Mon Sep 10 17:19:38 2018 +0700

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             |  24 +++++
 .../apache/james/queue/rabbitmq/MailDTO.java    | 104 ++++++++++++++++++-
 .../james/queue/rabbitmq/RabbitClient.java      |  13 ++-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java |  96 +++++++++++++++--
 .../rabbitmq/RabbitMQMailQueueFactory.java      |  18 ++--
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   |  94 +++++++----------
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  |  45 +++++++-
 7 files changed, 306 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 2339392..1dc37ef 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -39,10 +39,34 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-cassandra</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-cassandra</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-cassandra</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-core</artifactId>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-mail-store</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-queue-api</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java
index 3022154..76ed232 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java
@@ -19,49 +19,147 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import java.time.Instant;
 import java.util.Collection;
+import java.util.Map;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.core.MailAddress;
+import org.apache.james.util.SerializationUtil;
+import org.apache.james.util.streams.Iterators;
 import org.apache.mailet.Mail;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 class MailDTO {
 
-    static MailDTO fromMail(Mail mail) {
+    static MailDTO fromMail(Mail mail, MimeMessagePartsId partsId) {
         return new MailDTO(
             mail.getRecipients().stream()
                 .map(MailAddress::asString)
                 .collect(Guavate.toImmutableList()),
             mail.getName(),
-            mail.getSender().asString());
+            mail.getSender().asString(),
+            mail.getState(),
+            mail.getErrorMessage(),
+            mail.getLastUpdated().toInstant(),
+            serializedAttributes(mail),
+            mail.getRemoteAddr(),
+            mail.getRemoteHost(),
+            SerializationUtil.serialize(mail.getPerRecipientSpecificHeaders()),
+            partsId.getHeaderBlobId().asString(),
+            partsId.getBodyBlobId().asString());
+    }
+
+    private static ImmutableMap<String, String> serializedAttributes(Mail mail) {
+        return Iterators.toStream(mail.getAttributeNames())
+            .collect(Guavate.toImmutableMap(
+                name -> name,
+                name -> SerializationUtil.serialize(mail.getAttribute(name))));
     }
 
     private final ImmutableList<String> recipients;
     private final String name;
     private final String sender;
+    private final String state;
+    private final String errorMessage;
+    private final Instant lastUpdated;
+    private final ImmutableMap<String, String> attributes;
+    private final String remoteAddr;
+    private final String remoteHost;
+    private final String perRecipientHeaders;
+    private final String headerBlobId;
+    private final String bodyBlobId;
 
     @JsonCreator
     private MailDTO(@JsonProperty("recipients") ImmutableList<String> recipients,
                     @JsonProperty("name") String name,
-                    @JsonProperty("sender") String sender) {
+                    @JsonProperty("sender") String sender,
+                    @JsonProperty("state") String state,
+                    @JsonProperty("errorMessage") String errorMessage,
+                    @JsonProperty("lastUpdated") Instant lastUpdated,
+                    @JsonProperty("attributes") ImmutableMap<String, String> attributes,
+                    @JsonProperty("remoteAddr") String remoteAddr,
+                    @JsonProperty("remoteHost") String remoteHost,
+                    @JsonProperty("perRecipientHeaders") String perRecipientHeaders,
+                    @JsonProperty("headerBlobId") String headerBlobId,
+                    @JsonProperty("bodyBlobId") String bodyBlobId) {
         this.recipients = recipients;
         this.name = name;
         this.sender = sender;
+        this.state = state;
+        this.errorMessage = errorMessage;
+        this.lastUpdated = lastUpdated;
+        this.attributes = attributes;
+        this.remoteAddr = remoteAddr;
+        this.remoteHost = remoteHost;
+        this.perRecipientHeaders = perRecipientHeaders;
+        this.headerBlobId = headerBlobId;
+        this.bodyBlobId = bodyBlobId;
     }
 
+    @JsonProperty("recipients")
     Collection<String> getRecipients() {
         return recipients;
     }
 
+    @JsonProperty("name")
     String getName() {
         return name;
     }
 
+    @JsonProperty("sender")
     String getSender() {
         return sender;
     }
+
+    @JsonProperty("state")
+    String getState() {
+        return state;
+    }
+
+    @JsonProperty("errorMessage")
+    String getErrorMessage() {
+        return errorMessage;
+    }
+
+    @JsonProperty("lastUpdated")
+    Instant getLastUpdated() {
+        return lastUpdated;
+    }
+
+    @JsonProperty("attributes")
+    Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    @JsonProperty("remoteAddr")
+    String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+    @JsonProperty("remoteHost")
+    String getRemoteHost() {
+        return remoteHost;
+    }
+
+    @JsonProperty("perRecipientHeaders")
+    String getPerRecipientHeaders() {
+        return perRecipientHeaders;
+    }
+
+    @JsonProperty("headerBlobId")
+    String getHeaderBlobId() {
+        return headerBlobId;
+    }
+
+    @JsonProperty("bodyBlobId")
+    String getBodyBlobId() {
+        return bodyBlobId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
index 7439956..d5a945d 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
@@ -45,20 +45,19 @@ class RabbitClient {
         this.channel = channel;
     }
 
-    RabbitMQMailQueue attemptQueueCreation(MailQueueName name) {
+    void attemptQueueCreation(MailQueueName name) {
         try {
-            channel.exchangeDeclare(name.toRabbitExchangeName(), "direct", DURABLE);
-            channel.queueDeclare(name.toRabbitWorkQueueName(), DURABLE, !EXCLUSIVE, !AUTO_DELETE,
NO_ARGUMENTS);
-            channel.queueBind(name.toRabbitWorkQueueName(), name.toRabbitExchangeName(),
ROUTING_KEY);
+            channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE);
+            channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE,
!AUTO_DELETE, NO_ARGUMENTS);
+            channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(),
ROUTING_KEY);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return new RabbitMQMailQueue(name, this);
     }
 
     void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException
{
         try {
-            channel.basicPublish(name.toRabbitExchangeName(), ROUTING_KEY, new AMQP.BasicProperties(),
message);
+            channel.basicPublish(name.toRabbitExchangeName().asString(), ROUTING_KEY, new
AMQP.BasicProperties(), message);
         } catch (IOException e) {
             throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e);
         }
@@ -69,6 +68,6 @@ class RabbitClient {
     }
 
     Optional<GetResponse> poll(MailQueueName name) throws IOException {
-        return Optional.ofNullable(channel.basicGet(name.toRabbitWorkQueueName(), !AUTO_ACK));
+        return Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 68a3d67..c40fe6b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -20,13 +20,28 @@
 package org.apache.james.queue.rabbitmq;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import javax.inject.Inject;
+import javax.mail.MessagingException;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.core.MailAddress;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.server.core.MailImpl;
+import org.apache.james.util.SerializationUtil;
 import org.apache.mailet.Mail;
+import org.apache.mailet.PerRecipientHeaders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,10 +52,12 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 import com.rabbitmq.client.GetResponse;
 
 public class RabbitMQMailQueue implements MailQueue {
+
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQMailQueue.class);
 
     private static class NoMailYetException extends RuntimeException {
@@ -72,13 +89,34 @@ public class RabbitMQMailQueue implements MailQueue {
         }
     }
 
+    static class Factory {
+        private final RabbitClient rabbitClient;
+        private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
+        private final BlobId.Factory blobIdFactory;
+
+        @Inject
+        @VisibleForTesting Factory(RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId>
mimeMessageStore, BlobId.Factory blobIdFactory) {
+            this.rabbitClient = rabbitClient;
+            this.mimeMessageStore = mimeMessageStore;
+            this.blobIdFactory = blobIdFactory;
+        }
+
+        RabbitMQMailQueue create(MailQueueName mailQueueName) {
+            return new RabbitMQMailQueue(mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory);
+        }
+    }
+
     private static final int TEN_MS = 10;
 
     private final MailQueueName name;
     private final RabbitClient rabbitClient;
+    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
+    private final BlobId.Factory blobIdFactory;
     private final ObjectMapper objectMapper;
 
-    RabbitMQMailQueue(MailQueueName name, RabbitClient rabbitClient) {
+    RabbitMQMailQueue(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage,
MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) {
+        this.mimeMessageStore = mimeMessageStore;
+        this.blobIdFactory = blobIdFactory;
         this.name = name;
         this.rabbitClient = rabbitClient;
         this.objectMapper = new ObjectMapper()
@@ -102,11 +140,20 @@ public class RabbitMQMailQueue implements MailQueue {
 
     @Override
     public void enQueue(Mail mail) throws MailQueueException {
-        MailDTO mailDTO = MailDTO.fromMail(mail);
+        MimeMessagePartsId partsId = saveBlobs(mail).join();
+        MailDTO mailDTO = MailDTO.fromMail(mail, partsId);
         byte[] message = getMessageBytes(mailDTO);
         rabbitClient.publish(name, message);
     }
 
+    private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws MailQueueException
{
+        try {
+            return mimeMessageStore.save(mail.getMessage());
+        } catch (MessagingException e) {
+            throw new MailQueueException("Error while saving blob", e);
+        }
+    }
+
     private byte[] getMessageBytes(MailDTO mailDTO) throws MailQueueException {
         try {
             return objectMapper.writeValueAsBytes(mailDTO);
@@ -115,7 +162,6 @@ public class RabbitMQMailQueue implements MailQueue {
         }
     }
 
-
     @Override
     public MailQueueItem deQueue() throws MailQueueException {
         GetResponse getResponse = pollChannel();
@@ -147,13 +193,41 @@ public class RabbitMQMailQueue implements MailQueue {
             .orElseThrow(NoMailYetException::new);
     }
 
-    private Mail toMail(MailDTO dto) {
-        return new MailImpl(
-            dto.getName(),
-            MailAddress.getMailSender(dto.getSender()),
-            dto.getRecipients()
-                .stream()
-                .map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow())
-                .collect(Guavate.toImmutableList()));
+    private Mail toMail(MailDTO dto) throws MailQueueException {
+        try {
+            MimeMessage mimeMessage = mimeMessageStore.read(
+                MimeMessagePartsId.builder()
+                    .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId()))
+                    .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId()))
+                    .build())
+                .join();
+
+            MailImpl mail = new MailImpl(
+                dto.getName(),
+                MailAddress.getMailSender(dto.getSender()),
+                dto.getRecipients()
+                    .stream()
+                    .map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow())
+                    .collect(Guavate.toImmutableList()),
+                mimeMessage);
+
+            mail.setErrorMessage(dto.getErrorMessage());
+            mail.setRemoteAddr(dto.getRemoteAddr());
+            mail.setRemoteHost(dto.getRemoteHost());
+            mail.setState(dto.getState());
+            mail.setLastUpdated(new Date(dto.getLastUpdated().toEpochMilli()));
+
+            dto.getAttributes()
+                .forEach((name, value) -> mail.setAttribute(name, SerializationUtil.<Serializable>deserialize(value)));
+
+            Optional.ofNullable(SerializationUtil.<PerRecipientHeaders>deserialize(dto.getPerRecipientHeaders()))
+                .ifPresent(mail::addAllSpecificHeaderForRecipient);
+
+            return mail;
+        } catch (AddressException e) {
+            throw new MailQueueException("Failed to parse mail address", e);
+        } catch (MessagingException e) {
+            throw new MailQueueException("Failed to generate mime message", e);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 1e541aa..784cf50 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -29,17 +29,18 @@ import org.apache.james.queue.api.MailQueueFactory;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
-import com.rabbitmq.client.Connection;
 
 public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>
{
     private final RabbitClient rabbitClient;
     private final RabbitMQManagementApi mqManagementApi;
+    private final RabbitMQMailQueue.Factory mailQueueFactory;
 
     @VisibleForTesting
     @Inject
-    RabbitMQMailQueueFactory(Connection connection, RabbitMQManagementApi mqManagementApi)
throws IOException {
-        this.rabbitClient = new RabbitClient(connection.createChannel());
+    RabbitMQMailQueueFactory(RabbitClient rabbitClient, RabbitMQManagementApi mqManagementApi,
RabbitMQMailQueue.Factory mailQueueFactory) throws IOException {
+        this.rabbitClient = rabbitClient;
         this.mqManagementApi = mqManagementApi;
+        this.mailQueueFactory = mailQueueFactory;
     }
 
     @Override
@@ -51,20 +52,25 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     public RabbitMQMailQueue createQueue(String name) {
         MailQueueName mailQueueName = MailQueueName.fromString(name);
         return getQueue(mailQueueName)
-            .orElseGet(() -> rabbitClient.attemptQueueCreation(mailQueueName));
+            .orElseGet(() -> attemptQueueCreation(mailQueueName));
     }
 
     @Override
     public Set<RabbitMQMailQueue> listCreatedMailQueues() {
         return mqManagementApi.listCreatedMailQueueNames()
-            .map(name -> new RabbitMQMailQueue(name, rabbitClient))
+            .map(mailQueueFactory::create)
             .collect(Guavate.toImmutableSet());
     }
 
+    private RabbitMQMailQueue attemptQueueCreation(MailQueueName mailQueueName) {
+        rabbitClient.attemptQueueCreation(mailQueueName);
+        return mailQueueFactory.create(mailQueueName);
+    }
+
     private Optional<RabbitMQMailQueue> getQueue(MailQueueName name) {
         return mqManagementApi.listCreatedMailQueueNames()
             .filter(name::equals)
-            .map(queueName -> new RabbitMQMailQueue(queueName, rabbitClient))
+            .map(mailQueueFactory::create)
             .findFirst();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 9573ab8..a75f761 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -24,87 +24,67 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.concurrent.TimeoutException;
 
+import javax.mail.internet.MimeMessage;
+
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.DockerCassandraExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobsDAO;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueContract;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-@ExtendWith(DockerRabbitMQExtension.class)
+@ExtendWith({DockerRabbitMQExtension.class, DockerCassandraExtension.class})
 public class RabbitMQMailQueueTest implements MailQueueContract {
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+
+    private static CassandraCluster cassandra;
 
     private RabbitMQMailQueueFactory mailQueueFactory;
 
+    @BeforeAll
+    static void setUpClass(DockerCassandraExtension.DockerCassandra dockerCassandra) {
+        cassandra = CassandraCluster.create(CassandraBlobModule.MODULE, dockerCassandra.getHost());
+    }
+
     @BeforeEach
     void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException
{
+        CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION,
BLOB_ID_FACTORY);
+        Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore();
 
         URI rabbitManagementUri = new URIBuilder()
             .setScheme("http")
             .setHost(rabbitMQ.getHostIp())
             .setPort(rabbitMQ.getAdminPort())
             .build();
-        mailQueueFactory = new RabbitMQMailQueueFactory(
-            rabbitMQ.connectionFactory().newConnection(),
-            new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest",
"guest".toCharArray())));
-    }
-
-    @Override
-    public MailQueue getMailQueue() {
-        return mailQueueFactory.createQueue("spool");
-    }
-
-    @Disabled
-    @Override
-    public void queueShouldPreserveMimeMessage() {
-
-    }
-
-    @Disabled
-    @Override
-    public void queueShouldPreserveMailAttribute() {
-
-    }
-
-    @Disabled
-    @Override
-    public void queueShouldPreserveErrorMessage() {
-
-    }
-
-    @Disabled
-    @Override
-    public void queueShouldPreserveState() {
-
-    }
-
-    @Disabled
-    @Override
-    public void queueShouldPreserveRemoteAddress() {
-
-    }
-
-    @Disabled
-    @Override
-    public void queueShouldPreserveRemoteHost() {
-
+        RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel());
+        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore,
BLOB_ID_FACTORY);
+        RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri,
new RabbitMQManagementCredentials("guest", "guest".toCharArray()));
+        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
     }
 
-    @Disabled
-    @Override
-    public void queueShouldPreserveLastUpdated() {
-
+    @AfterEach
+    void tearDown() {
+        cassandra.clearTables();
     }
 
-    @Disabled
-    @Override
-    public void queueShouldPreservePerRecipientHeaders() {
-
+    @AfterAll
+    static void tearDownClass() {
+        cassandra.closeCluster();
     }
 
-    @Disabled
     @Override
-    public void queueShouldPreserveNonStringMailAttribute() {
-
+    public MailQueue getMailQueue() {
+        return mailQueueFactory.createQueue("spool");
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 028d125..60b3e10 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -24,27 +24,64 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.concurrent.TimeoutException;
 
+import javax.mail.internet.MimeMessage;
+
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.DockerCassandraExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobsDAO;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueFactoryContract;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-@ExtendWith(DockerRabbitMQExtension.class)
+@ExtendWith({DockerRabbitMQExtension.class, DockerCassandraExtension.class})
 class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue>
{
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+
+    private static CassandraCluster cassandra;
 
     private RabbitMQMailQueueFactory mailQueueFactory;
 
+    @BeforeAll
+    static void setUpClass(DockerCassandraExtension.DockerCassandra dockerCassandra) {
+        cassandra = CassandraCluster.create(CassandraBlobModule.MODULE, dockerCassandra.getHost());
+    }
+
     @BeforeEach
     void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException
{
+        CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION,
BLOB_ID_FACTORY);
+        Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore();
+
         URI rabbitManagementUri = new URIBuilder()
             .setScheme("http")
             .setHost(rabbitMQ.getHostIp())
             .setPort(rabbitMQ.getAdminPort())
             .build();
-        mailQueueFactory = new RabbitMQMailQueueFactory(
-            rabbitMQ.connectionFactory().newConnection(),
-            new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest",
"guest".toCharArray())));
+
+        RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel());
+        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore,
BLOB_ID_FACTORY);
+        RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri,
new RabbitMQManagementCredentials("guest", "guest".toCharArray()));
+        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
+    }
+
+    @AfterEach
+    void tearDown() {
+        cassandra.clearTables();
+    }
+
+    @AfterAll
+    static void tearDownClass() {
+        cassandra.closeCluster();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message