james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [1/2] james-project git commit: JAMES-2544 Store mime message as blobs for enqueued mail
Date Wed, 03 Oct 2018 02:32:35 GMT
Repository: james-project
Updated Branches:
  refs/heads/master 8a7b69a1a -> 0ca09252a


JAMES-2544 Store mime message as blobs for enqueued mail


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a076d135
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a076d135
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a076d135

Branch: refs/heads/master
Commit: a076d135ef5a69e10ff94cad750dce63d90e0486
Parents: 8a7b69a
Author: Antoine Duprat <aduprat@linagora.com>
Authored: Thu Sep 27 11:59:44 2018 +0200
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Wed Oct 3 09:30:40 2018 +0700

----------------------------------------------------------------------
 .../org/apache/james/metrics/api/Metric.java    |   1 -
 .../queue/api/ManageableMailQueueContract.java  |  24 +++
 .../james/queue/file/FileMailQueueTest.java     |   7 +
 server/queue/queue-rabbitmq/pom.xml             |  10 +-
 .../apache/james/queue/rabbitmq/Enqueuer.java   |  13 +-
 .../queue/rabbitmq/view/api/MailQueueView.java  |   4 +-
 .../cassandra/CassandraMailQueueBrowser.java    |  45 +++++-
 .../cassandra/CassandraMailQueueMailDelete.java |   3 +-
 .../cassandra/CassandraMailQueueMailStore.java  |  25 ++-
 .../view/cassandra/CassandraMailQueueView.java  |  14 +-
 .../view/cassandra/EnqueuedMailsDAO.java        |  38 +++--
 .../view/cassandra/EnqueuedMailsDaoUtil.java    |  32 ++--
 .../view/cassandra/model/EnqueuedMail.java      | 152 -------------------
 ...abbitMQMailQueueConfigurationChangeTest.java |   5 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   |   9 +-
 .../CassandraMailQueueViewTestFactory.java      |  14 +-
 .../view/cassandra/EnqueuedMailsDaoTest.java    |  86 ++++++-----
 .../view/cassandra/model/EnqueuedMailTest.java  |  33 ----
 18 files changed, 232 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
----------------------------------------------------------------------
diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
index 1e0aa1a..fdf75b6 100644
--- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
+++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/Metric.java
@@ -28,5 +28,4 @@ public interface Metric {
     void add(int value);
 
     void remove(int value);
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
index 6520288..48f422b 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
@@ -28,7 +28,11 @@ import static org.apache.mailet.base.MailAddressFixture.RECIPIENT3;
 import static org.apache.mailet.base.MailAddressFixture.SENDER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
 
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.core.builder.MimeMessageBuilder;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.MailAddressFixture;
 import org.junit.jupiter.api.Test;
@@ -421,6 +425,26 @@ public interface ManageableMailQueueContract extends MailQueueContract {
     }
 
     @Test
+    default void browseShouldReturnMailsWithMimeMessage() throws Exception {
+        ManageableMailQueue mailQueue = getManageableMailQueue();
+        mailQueue.enQueue(defaultMail()
+            .name("mail with blob")
+            .mimeMessage(MimeMessageBuilder.mimeMessageBuilder()
+                .setSubject("mail subject")
+                .setText("mail body")
+                .build())
+            .build());
+
+        MimeMessage mimeMessage = mailQueue.browse().next().getMail().getMessage();
+        String subject = mimeMessage.getSubject();
+        Object content = mimeMessage.getContent();
+
+        assertSoftly(softly ->  {
+            softly.assertThat(subject).isEqualTo("mail subject");
+            softly.assertThat(content).isEqualTo("mail body");
+        });
+    }
+    @Test
     default void browsingShouldNotAffectDequeue() throws Exception {
         enQueue(defaultMail()
             .name("name1")

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
index f044d8c..4ad3753 100644
--- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
+++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
@@ -97,4 +97,11 @@ public class FileMailQueueTest implements DelayedManageableMailQueueContract {
     public void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() {
 
     }
+
+    @Test
+    @Override
+    @Disabled("JAMES-2544 Not supported yet")
+    public void browseShouldReturnMailsWithMimeMessage() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index e8cc08d..d55ddec 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -38,11 +38,11 @@
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.james</groupId>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-backends-rabbitmq</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.james</groupId>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-backends-rabbitmq</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
@@ -68,6 +68,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>event-sourcing-core</artifactId>
         </dependency>
         <dependency>
@@ -78,7 +83,6 @@
             <groupId>${james.groupId}</groupId>
             <artifactId>event-sourcing-event-store-cassandra</artifactId>
             <type>test-jar</type>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index f1d4ca4..b0dbc84 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -61,9 +61,9 @@ class Enqueuer {
 
     void enQueue(Mail mail) throws MailQueue.MailQueueException {
         saveMail(mail)
-            .thenAccept(Throwing.<MimeMessagePartsId>consumer(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow())
+            .thenApply(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow())
+            .thenApply(mailQueueView::storeMail)
             .thenRun(enqueueMetric::increment)
-            .thenCompose(any -> mailQueueView.storeMail(clock.instant(), mail))
             .join();
     }
 
@@ -75,8 +75,15 @@ class Enqueuer {
         }
     }
 
-    private void publishReferenceToRabbit(Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException {
+    private EnqueuedItem publishReferenceToRabbit(Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException {
         rabbitClient.publish(name, getMailReferenceBytes(mail, partsId));
+
+        return EnqueuedItem.builder()
+            .mailQueueName(name)
+            .mail(mail)
+            .enqueuedTime(clock.instant())
+            .mimeMessagePartsId(partsId)
+            .build();
     }
 
     private byte[] getMailReferenceBytes(Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 35740d7..0b38913 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -19,10 +19,10 @@
 
 package org.apache.james.queue.rabbitmq.view.api;
 
-import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.mailet.Mail;
 
@@ -30,7 +30,7 @@ public interface MailQueueView {
 
     void initialize(MailQueueName mailQueueName);
 
-    CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail);
+    CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem);
 
     CompletableFuture<Long> delete(DeleteCondition deleteCondition);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 26b308d..7ef0753 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -33,12 +33,20 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeMessage;
 
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.james.util.FluentFutureStream;
+import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -68,9 +76,12 @@ class CassandraMailQueueBrowser {
         }
     }
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMailQueueBrowser.class);
+
     private final BrowseStartDAO browseStartDao;
     private final DeletedMailsDAO deletedMailsDao;
     private final EnqueuedMailsDAO enqueuedMailsDao;
+    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
     private final CassandraMailQueueViewConfiguration configuration;
     private final Clock clock;
 
@@ -78,41 +89,61 @@ class CassandraMailQueueBrowser {
     CassandraMailQueueBrowser(BrowseStartDAO browseStartDao,
                               DeletedMailsDAO deletedMailsDao,
                               EnqueuedMailsDAO enqueuedMailsDao,
+                              Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
                               CassandraMailQueueViewConfiguration configuration,
                               Clock clock) {
         this.browseStartDao = browseStartDao;
         this.deletedMailsDao = deletedMailsDao;
         this.enqueuedMailsDao = enqueuedMailsDao;
+        this.mimeMessageStore = mimeMessageStore;
         this.configuration = configuration;
         this.clock = clock;
     }
 
     CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) {
         return browseReferences(queueName)
-            .map(EnqueuedMail::getMail)
+            .map(this::toMailFuture, FluentFutureStream::unboxFuture)
             .map(ManageableMailQueue.MailQueueItemView::new)
             .completableFuture();
     }
 
-    FluentFutureStream<EnqueuedMail> browseReferences(MailQueueName queueName) {
+    FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
         return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
             .thenApply(this::allSlicesStartingAt))
             .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture);
     }
 
-    private FluentFutureStream<EnqueuedMail> browseSlice(MailQueueName queueName, Slice slice) {
+    private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
+        EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
+        return mimeMessageStore.read(enqueuedItem.getPartsId())
+            .thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+    }
+
+    private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
+        Mail mail = enqueuedItem.getMail();
+
+        try {
+            mail.setMessage(mimeMessage);
+        } catch (MessagingException e) {
+            LOGGER.error("error while setting mime message to mail {}", mail.getName(), e);
+        }
+
+        return mail;
+    }
+
+    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
         return FluentFutureStream.of(
             allBucketIds()
                 .map(bucketId ->
                     browseBucket(queueName, slice, bucketId).completableFuture()),
             FluentFutureStream::unboxStream)
-            .sorted(Comparator.comparing(EnqueuedMail::getEnqueuedTime));
+            .sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
     }
 
-    private FluentFutureStream<EnqueuedMail> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
+    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
         return FluentFutureStream.of(
             enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
-                .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getMailKey()));
+                .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
     }
 
     private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index 2bf01fe..f4d641c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -29,7 +29,6 @@ import javax.inject.Inject;
 
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
 import org.apache.mailet.Mail;
 
@@ -77,7 +76,7 @@ class CassandraMailQueueMailDelete {
 
     private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
-            .map(EnqueuedMail::getTimeRangeStart)
+            .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
             .completableFuture()
             .thenApply(Stream::findFirst);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
index 40b1274..4fc2d02 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -25,11 +25,11 @@ import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.mailet.Mail;
 
 class CassandraMailQueueMailStore {
@@ -50,10 +50,10 @@ class CassandraMailQueueMailStore {
         this.clock = clock;
     }
 
-    CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName mailQueueName, Instant enqueuedTime) {
-        EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName, enqueuedTime);
+    CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
+        EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem);
 
-        return enqueuedMailsDao.insert(enqueuedMail);
+        return enqueuedMailsDao.insert(enqueuedItemAndSlicing);
     }
 
     CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) {
@@ -61,14 +61,13 @@ class CassandraMailQueueMailStore {
             .insertInitialBrowseStart(mailQueueName, currentSliceStartInstant());
     }
 
-    private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName mailQueueName, Instant enqueuedTime) {
-        return EnqueuedMail.builder()
-            .mail(mail)
-            .bucketId(computedBucketId(mail))
-            .timeRangeStart(currentSliceStartInstant())
-            .enqueuedTime(enqueuedTime)
-            .mailKey(MailKey.fromMail(mail))
-            .mailQueueName(mailQueueName)
+    private EnqueuedItemWithSlicingContext addSliceContext(EnqueuedItem enqueuedItem) {
+        Mail mail = enqueuedItem.getMail();
+
+        return EnqueuedItemWithSlicingContext.builder()
+            .enqueuedItem(enqueuedItem)
+            .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext
+                .of(computedBucketId(mail), currentSliceStartInstant()))
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index b55ce47..db77356 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -19,23 +19,22 @@
 
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
-import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
 import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
-import com.google.common.collect.Iterators;
-
 public class CassandraMailQueueView implements MailQueueView {
 
     public static class Factory {
@@ -84,8 +83,8 @@ public class CassandraMailQueueView implements MailQueueView {
     }
 
     @Override
-    public CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail) {
-        return storeHelper.storeMailInEnqueueTable(mail, mailQueueName, enqueuedTime);
+    public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
+        return storeHelper.storeMail(enqueuedItem);
     }
 
     @Override
@@ -98,12 +97,15 @@ public class CassandraMailQueueView implements MailQueueView {
 
     @Override
     public long getSize() {
-        return Iterators.size(browse());
+        return cassandraMailQueueBrowser.browseReferences(mailQueueName)
+                .join()
+                .count();
     }
 
     @Override
     public CompletableFuture<Long> delete(DeleteCondition deleteCondition) {
         CompletableFuture<Long> result = cassandraMailQueueBrowser.browseReferences(mailQueueName)
+            .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail()))
             .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName),
                 FluentFutureStream::unboxFuture)

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
index 2b8e11b..6c3318b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -24,9 +24,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS;
@@ -54,9 +56,12 @@ import javax.inject.Inject;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.core.MailAddress;
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.mailet.Mail;
 
 import com.datastax.driver.core.PreparedStatement;
@@ -69,15 +74,18 @@ class EnqueuedMailsDAO {
     private final PreparedStatement insert;
     private final CassandraUtils cassandraUtils;
     private final CassandraTypesProvider cassandraTypesProvider;
+    private final BlobId.Factory blobFactory;
 
     @Inject
-    EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider) {
+    EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider,
+                     BlobId.Factory blobIdFactory) {
         this.executor = new CassandraAsyncExecutor(session);
         this.cassandraUtils = cassandraUtils;
         this.cassandraTypesProvider = cassandraTypesProvider;
 
         this.selectFrom = prepareSelectFrom(session);
         this.insert = prepareInsert(session);
+        this.blobFactory = blobIdFactory;
     }
 
     private PreparedStatement prepareSelectFrom(Session session) {
@@ -94,6 +102,8 @@ class EnqueuedMailsDAO {
             .value(TIME_RANGE_START, bindMarker(TIME_RANGE_START))
             .value(BUCKET_ID, bindMarker(BUCKET_ID))
             .value(MAIL_KEY, bindMarker(MAIL_KEY))
+            .value(HEADER_BLOB_ID, bindMarker(HEADER_BLOB_ID))
+            .value(BODY_BLOB_ID, bindMarker(BODY_BLOB_ID))
             .value(ENQUEUED_TIME, bindMarker(ENQUEUED_TIME))
             .value(STATE, bindMarker(STATE))
             .value(SENDER, bindMarker(SENDER))
@@ -106,15 +116,20 @@ class EnqueuedMailsDAO {
             .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
     }
 
-    CompletableFuture<Void> insert(EnqueuedMail enqueuedMail) {
-        Mail mail = enqueuedMail.getMail();
+    CompletableFuture<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
+        EnqueuedItem enqueuedItem = enqueuedItemWithSlicing.getEnqueuedItem();
+        EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicing.getSlicingContext();
+        Mail mail = enqueuedItem.getMail();
+        MimeMessagePartsId mimeMessagePartsId = enqueuedItem.getPartsId();
 
         return executor.executeVoid(insert.bind()
-            .setString(QUEUE_NAME, enqueuedMail.getMailQueueName().asString())
-            .setTimestamp(TIME_RANGE_START, Date.from(enqueuedMail.getTimeRangeStart()))
-            .setInt(BUCKET_ID, enqueuedMail.getBucketId().getValue())
-            .setTimestamp(ENQUEUED_TIME, Date.from(enqueuedMail.getEnqueuedTime()))
+            .setString(QUEUE_NAME, enqueuedItem.getMailQueueName().asString())
+            .setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart()))
+            .setInt(BUCKET_ID, slicingContext.getBucketId().getValue())
+            .setTimestamp(ENQUEUED_TIME, Date.from(enqueuedItem.getEnqueuedTime()))
             .setString(MAIL_KEY, mail.getName())
+            .setString(HEADER_BLOB_ID, mimeMessagePartsId.getHeaderBlobId().asString())
+            .setString(BODY_BLOB_ID, mimeMessagePartsId.getBodyBlobId().asString())
             .setString(STATE, mail.getState())
             .setString(SENDER, Optional.ofNullable(mail.getSender())
                 .map(MailAddress::asString)
@@ -125,11 +140,10 @@ class EnqueuedMailsDAO {
             .setString(REMOTE_HOST, mail.getRemoteHost())
             .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
             .setMap(ATTRIBUTES, toRawAttributeMap(mail))
-            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))
-        );
+            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())));
     }
 
-    CompletableFuture<Stream<EnqueuedMail>> selectEnqueuedMails(
+    CompletableFuture<Stream<EnqueuedItemWithSlicingContext>> selectEnqueuedMails(
         MailQueueName queueName, Slice slice, BucketId bucketId) {
 
         return executor.execute(
@@ -138,7 +152,7 @@ class EnqueuedMailsDAO {
                 .setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant()))
                 .setInt(BUCKET_ID, bucketId.getValue()))
             .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(EnqueuedMailsDaoUtil::toEnqueuedMail));
+                .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory)));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
index 6be8597..40e84bc 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
@@ -20,9 +20,11 @@
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_NAME;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_TYPE;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_VALUE;
@@ -55,11 +57,13 @@ import javax.mail.internet.AddressException;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.core.MailAddress;
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.james.server.core.MailImpl;
 import org.apache.james.util.streams.Iterators;
 import org.apache.mailet.Mail;
@@ -73,11 +77,18 @@ import com.google.common.collect.ImmutableMap;
 
 public class EnqueuedMailsDaoUtil {
 
-    static EnqueuedMail toEnqueuedMail(Row row) {
+    static EnqueuedItemWithSlicingContext toEnqueuedMail(Row row, BlobId.Factory blobFactory) {
         MailQueueName queueName = MailQueueName.fromString(row.getString(QUEUE_NAME));
         Instant timeRangeStart = row.getTimestamp(TIME_RANGE_START).toInstant();
         BucketedSlices.BucketId bucketId = BucketedSlices.BucketId.of(row.getInt(BUCKET_ID));
         Instant enqueuedTime = row.getTimestamp(ENQUEUED_TIME).toInstant();
+        BlobId headerBlobId = blobFactory.from(row.getString(HEADER_BLOB_ID));
+        BlobId bodyBlobId = blobFactory.from(row.getString(BODY_BLOB_ID));
+        MimeMessagePartsId mimeMessagePartsId = MimeMessagePartsId
+            .builder()
+            .headerBlobId(headerBlobId)
+            .bodyBlobId(bodyBlobId)
+            .build();
 
         MailAddress sender = Optional.ofNullable(row.getString(SENDER))
             .map(Throwing.function(MailAddress::new))
@@ -107,14 +118,17 @@ public class EnqueuedMailsDaoUtil {
             .addAllHeadersForRecipients(perRecipientHeaders)
             .attributes(toAttributes(rawAttributes))
             .build();
-
-        return EnqueuedMail.builder()
+        EnqueuedItem enqueuedItem = EnqueuedItem.builder()
+            .mailQueueName(queueName)
             .mail(mail)
-            .bucketId(bucketId)
-            .timeRangeStart(timeRangeStart)
             .enqueuedTime(enqueuedTime)
-            .mailKey(MailKey.of(name))
-            .mailQueueName(queueName)
+            .mimeMessagePartsId(mimeMessagePartsId)
+            .build();
+
+
+        return EnqueuedItemWithSlicingContext.builder()
+            .enqueuedItem(enqueuedItem)
+            .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(bucketId, timeRangeStart))
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
deleted file mode 100644
index 93d8e8b..0000000
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.rabbitmq.view.cassandra.model;
-
-import java.time.Instant;
-import java.util.Objects;
-
-import org.apache.james.queue.rabbitmq.MailQueueName;
-import org.apache.mailet.Mail;
-
-public class EnqueuedMail {
-
-    public interface Builder {
-
-        @FunctionalInterface
-        interface RequireMail {
-            RequireBucketId mail(Mail mail);
-        }
-
-        @FunctionalInterface
-        interface RequireBucketId {
-            RequireTimeRangeStart bucketId(BucketedSlices.BucketId bucketId);
-        }
-
-        @FunctionalInterface
-        interface RequireTimeRangeStart {
-            RequireEnqueuedTime timeRangeStart(Instant timeRangeStart);
-        }
-
-        @FunctionalInterface
-        interface RequireEnqueuedTime {
-            RequireMailKey enqueuedTime(Instant enqueuedTime);
-        }
-
-        @FunctionalInterface
-        interface RequireMailKey {
-            RequireMailQueueName mailKey(MailKey mailKey);
-        }
-
-        @FunctionalInterface
-        interface RequireMailQueueName {
-            LastStage mailQueueName(MailQueueName mailQueueName);
-        }
-
-        class LastStage {
-            private Mail mail;
-            private BucketedSlices.BucketId bucketId;
-            private Instant timeRangeStart;
-            private Instant enqueuedTime;
-            private MailKey mailKey;
-            private MailQueueName mailQueueName;
-
-            private LastStage(Mail mail, BucketedSlices.BucketId bucketId,
-                              Instant timeRangeStart, Instant enqueuedTime,
-                              MailKey mailKey, MailQueueName mailQueueName) {
-                this.mail = mail;
-                this.bucketId = bucketId;
-                this.timeRangeStart = timeRangeStart;
-                this.enqueuedTime = enqueuedTime;
-                this.mailKey = mailKey;
-                this.mailQueueName = mailQueueName;
-            }
-
-            public EnqueuedMail build() {
-                return new EnqueuedMail(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName);
-            }
-        }
-    }
-
-    public static Builder.RequireMail builder() {
-        return mail -> bucketId -> timeRangeStart -> enqueuedTime -> mailKey -> mailQueueName ->
-            new Builder.LastStage(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName);
-    }
-
-    private final Mail mail;
-    private final BucketedSlices.BucketId bucketId;
-    private final Instant timeRangeStart;
-    private final Instant enqueuedTime;
-    private final MailKey mailKey;
-    private final MailQueueName mailQueueName;
-
-    private EnqueuedMail(Mail mail, BucketedSlices.BucketId bucketId, Instant timeRangeStart,
-                         Instant enqueuedTime, MailKey mailKey, MailQueueName mailQueueName) {
-        this.mail = mail;
-        this.bucketId = bucketId;
-        this.timeRangeStart = timeRangeStart;
-        this.enqueuedTime = enqueuedTime;
-        this.mailKey = mailKey;
-        this.mailQueueName = mailQueueName;
-    }
-
-    public Mail getMail() {
-        return mail;
-    }
-
-    public BucketedSlices.BucketId getBucketId() {
-        return bucketId;
-    }
-
-    public MailKey getMailKey() {
-        return mailKey;
-    }
-
-    public MailQueueName getMailQueueName() {
-        return mailQueueName;
-    }
-
-    public Instant getTimeRangeStart() {
-        return timeRangeStart;
-    }
-
-    public Instant getEnqueuedTime() {
-        return enqueuedTime;
-    }
-
-    @Override
-    public final boolean equals(Object o) {
-        if (o instanceof EnqueuedMail) {
-            EnqueuedMail that = (EnqueuedMail) o;
-
-            return Objects.equals(this.bucketId, that.bucketId)
-                    && Objects.equals(this.mail, that.mail)
-                    && Objects.equals(this.timeRangeStart, that.timeRangeStart)
-                    && Objects.equals(this.enqueuedTime, that.enqueuedTime)
-                    && Objects.equals(this.mailKey, that.mailKey)
-                    && Objects.equals(this.mailQueueName, that.mailQueueName);
-        }
-        return false;
-    }
-
-    @Override
-    public final int hashCode() {
-        return Objects.hash(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 55d6d8e..c28b14c 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -115,8 +115,11 @@ class RabbitMQMailQueueConfigurationChangeTest {
     }
 
     private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, CassandraMailQueueViewConfiguration mailQueueViewConfiguration) throws Exception {
+        CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY);
+        Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore();
+
         MailQueueView mailQueueView = CassandraMailQueueViewTestFactory.factory(clock, random, cassandra.getConf(), cassandra.getTypesProvider(),
-            mailQueueViewConfiguration)
+            mailQueueViewConfiguration, mimeMessageStore)
             .create(MailQueueName.fromString(SPOOL));
 
         RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 2e96a26..0ad51a7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -108,12 +108,17 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ
         clock = new UpdatableTickingClock(IN_SLICE_1);
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
-        MailQueueView mailQueueView = CassandraMailQueueViewTestFactory.factory(clock, random, cassandra.getConf(), cassandra.getTypesProvider(),
+        MailQueueView mailQueueView = CassandraMailQueueViewTestFactory.factory(
+            clock,
+            random,
+            cassandra.getConf(),
+            cassandra.getTypesProvider(),
             CassandraMailQueueViewConfiguration.builder()
                     .bucketCount(THREE_BUCKET_COUNT)
                     .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
                     .sliceWindow(ONE_HOUR_SLICE_WINDOW)
-                    .build())
+                    .build(),
+            mimeMessageStore)
             .create(MailQueueName.fromString(SPOOL));
 
         RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder()

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 4092b0e..a42bbbc 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -23,11 +23,16 @@ import java.time.Clock;
 import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
+import javax.mail.internet.MimeMessage;
+
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -40,12 +45,15 @@ public class CassandraMailQueueViewTestFactory {
 
     public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session,
                                                          CassandraTypesProvider typesProvider,
-                                                         CassandraMailQueueViewConfiguration configuration) {
-        EnqueuedMailsDAO enqueuedMailsDao = new EnqueuedMailsDAO(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION, typesProvider);
+                                                         CassandraMailQueueViewConfiguration configuration,
+                                                         Store<MimeMessage, MimeMessagePartsId> mimeMessageStore) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+
+        EnqueuedMailsDAO enqueuedMailsDao = new EnqueuedMailsDAO(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION, typesProvider, blobIdFactory);
         BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
         DeletedMailsDAO deletedMailsDao = new DeletedMailsDAO(session);
 
-        CassandraMailQueueBrowser cassandraMailQueueBrowser = new CassandraMailQueueBrowser(browseStartDao, deletedMailsDao, enqueuedMailsDao, configuration, clock);
+        CassandraMailQueueBrowser cassandraMailQueueBrowser = new CassandraMailQueueBrowser(browseStartDao, deletedMailsDao, enqueuedMailsDao, mimeMessageStore, configuration, clock);
         CassandraMailQueueMailStore cassandraMailQueueMailStore = new CassandraMailQueueMailStore(enqueuedMailsDao, browseStartDao, configuration, clock);
         CassandraMailQueueMailDelete cassandraMailQueueMailDelete = new CassandraMailQueueMailDelete(deletedMailsDao, browseStartDao, cassandraMailQueueBrowser, configuration, random);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
index 580ba9f..3d44db1 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
@@ -30,11 +30,14 @@ import java.util.stream.Stream;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
-import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
 import org.apache.mailet.base.test.FakeMail;
-import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -48,6 +51,14 @@ class EnqueuedMailsDaoTest {
     private static final Instant NOW = Instant.now();
     private static final Slice SLICE_OF_NOW = Slice.of(NOW);
 
+    private static final BlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    private static final BlobId HEADER_BLOB_ID = BLOB_ID_FACTORY.from("header blob id");
+    private static final BlobId BODY_BLOB_ID = BLOB_ID_FACTORY.from("body blob id");
+    private static final MimeMessagePartsId MIME_MESSAGE_PARTS_ID = MimeMessagePartsId.builder()
+        .headerBlobId(HEADER_BLOB_ID)
+        .bodyBlobId(BODY_BLOB_ID)
+        .build();
+
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE);
 
@@ -55,27 +66,29 @@ class EnqueuedMailsDaoTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
+        BlobId.Factory blobFactory = new HashBlobId.Factory();
         testee = new EnqueuedMailsDAO(
             cassandra.getConf(),
             CassandraUtils.WITH_DEFAULT_CONFIGURATION,
-            cassandra.getTypesProvider());
+            cassandra.getTypesProvider(), blobFactory);
     }
 
     @Test
     void insertShouldWork() throws Exception {
-        testee.insert(EnqueuedMail.builder()
-                .mail(FakeMail.builder()
-                    .name(MAIL_KEY_1.getMailKey())
+        testee.insert(EnqueuedItemWithSlicingContext.builder()
+                .enqueuedItem(EnqueuedItem.builder()
+                    .mailQueueName(OUT_GOING_1)
+                    .mail(FakeMail.builder()
+                        .name(MAIL_KEY_1.getMailKey())
+                        .build())
+                    .enqueuedTime(NOW)
+                    .mimeMessagePartsId(MIME_MESSAGE_PARTS_ID)
                     .build())
-                .bucketId(BucketId.of(BUCKET_ID_VALUE))
-                .timeRangeStart(NOW)
-                .enqueuedTime(NOW)
-                .mailKey(MAIL_KEY_1)
-                .mailQueueName(OUT_GOING_1)
+                .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
                 .build())
             .join();
 
-        Stream<EnqueuedMail> selectedEnqueuedMails = testee
+        Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
             .selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
             .join();
 
@@ -84,42 +97,47 @@ class EnqueuedMailsDaoTest {
 
     @Test
     void selectEnqueuedMailsShouldWork() throws Exception {
-        testee.insert(EnqueuedMail.builder()
-                .mail(FakeMail.builder()
-                    .name(MAIL_KEY_1.getMailKey())
+        testee.insert(EnqueuedItemWithSlicingContext.builder()
+                .enqueuedItem(EnqueuedItem.builder()
+                    .mailQueueName(OUT_GOING_1)
+                    .mail(FakeMail.builder()
+                        .name(MAIL_KEY_1.getMailKey())
+                        .build())
+                    .enqueuedTime(NOW)
+                    .mimeMessagePartsId(MIME_MESSAGE_PARTS_ID)
                     .build())
-                .bucketId(BucketId.of(BUCKET_ID_VALUE))
-                .timeRangeStart(NOW)
-                .enqueuedTime(NOW)
-                .mailKey(MAIL_KEY_1)
-                .mailQueueName(OUT_GOING_1)
+                .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
                 .build())
             .join();
 
-        testee.insert(EnqueuedMail.builder()
-                .mail(FakeMail.builder()
-                    .name(MAIL_KEY_1.getMailKey())
+        testee.insert(EnqueuedItemWithSlicingContext.builder()
+                .enqueuedItem(EnqueuedItem.builder()
+                    .mailQueueName(OUT_GOING_1)
+                    .mail(FakeMail.builder()
+                        .name(MAIL_KEY_1.getMailKey())
+                        .build())
+                    .enqueuedTime(NOW)
+                    .mimeMessagePartsId(MIME_MESSAGE_PARTS_ID)
                     .build())
-                .bucketId(BucketId.of(BUCKET_ID_VALUE + 1))
-                .timeRangeStart(NOW)
-                .enqueuedTime(NOW)
-                .mailKey(MAIL_KEY_1)
-                .mailQueueName(OUT_GOING_1)
+                .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE + 1), NOW))
                 .build())
             .join();
 
-        Stream<EnqueuedMail> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+        Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
             .join();
 
         assertThat(selectedEnqueuedMails)
             .hasSize(1)
             .hasOnlyOneElementSatisfying(selectedEnqueuedMail -> {
+                EnqueuedItem enqueuedItem = selectedEnqueuedMail.getEnqueuedItem();
+                EnqueuedItemWithSlicingContext.SlicingContext slicingContext = selectedEnqueuedMail.getSlicingContext();
                 assertSoftly(softly -> {
-                    softly.assertThat(selectedEnqueuedMail.getMailQueueName()).isEqualTo(OUT_GOING_1);
-                    softly.assertThat(selectedEnqueuedMail.getBucketId()).isEqualTo(BUCKET_ID);
-                    softly.assertThat(selectedEnqueuedMail.getTimeRangeStart()).isEqualTo(NOW);
-                    softly.assertThat(selectedEnqueuedMail.getEnqueuedTime()).isEqualTo(NOW);
-                    softly.assertThat(selectedEnqueuedMail.getMailKey()).isEqualTo(MAIL_KEY_1);
+                    softly.assertThat(slicingContext.getBucketId()).isEqualTo(BUCKET_ID);
+                    softly.assertThat(slicingContext.getTimeRangeStart()).isEqualTo(NOW);
+                    softly.assertThat(enqueuedItem.getMailQueueName()).isEqualTo(OUT_GOING_1);
+                    softly.assertThat(enqueuedItem.getEnqueuedTime()).isEqualTo(NOW);
+                    softly.assertThat(enqueuedItem.getMailKey()).isEqualTo(MAIL_KEY_1);
+                    softly.assertThat(enqueuedItem.getPartsId()).isEqualTo(MIME_MESSAGE_PARTS_ID);
                 });
             });
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a076d135/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java
deleted file mode 100644
index fb0f487..0000000
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.rabbitmq.view.cassandra.model;
-
-import org.junit.jupiter.api.Test;
-
-import nl.jqno.equalsverifier.EqualsVerifier;
-
-class EnqueuedMailTest {
-
-    @Test
-    void shouldMatchBeanContract() {
-        EqualsVerifier.forClass(EnqueuedMail.class)
-            .verify();
-    }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message