james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [2/5] james-project git commit: JAMES-2544 RabbitMQ browse Cassandra DAOs and tests
Date Wed, 26 Sep 2018 02:23:38 GMT
http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
new file mode 100644
index 0000000..6be8597
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java
@@ -0,0 +1,196 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_NAME;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_TYPE;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_VALUE;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.QUEUE_NAME;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.STATE;
+import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import javax.mail.internet.AddressException;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.core.MailAddress;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.apache.james.server.core.MailImpl;
+import org.apache.james.util.streams.Iterators;
+import org.apache.mailet.Mail;
+import org.apache.mailet.PerRecipientHeaders;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.UDTValue;
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class EnqueuedMailsDaoUtil {
+
+    static EnqueuedMail toEnqueuedMail(Row row) {
+        MailQueueName queueName = MailQueueName.fromString(row.getString(QUEUE_NAME));
+        Instant timeRangeStart = row.getTimestamp(TIME_RANGE_START).toInstant();
+        BucketedSlices.BucketId bucketId = BucketedSlices.BucketId.of(row.getInt(BUCKET_ID));
+        Instant enqueuedTime = row.getTimestamp(ENQUEUED_TIME).toInstant();
+
+        MailAddress sender = Optional.ofNullable(row.getString(SENDER))
+            .map(Throwing.function(MailAddress::new))
+            .orElse(null);
+        List<MailAddress> recipients = row.getList(RECIPIENTS, String.class)
+            .stream()
+            .map(Throwing.function(MailAddress::new))
+            .collect(ImmutableList.toImmutableList());
+        String state = row.getString(STATE);
+        String remoteAddr = row.getString(REMOTE_ADDR);
+        String remoteHost = row.getString(REMOTE_HOST);
+        String errorMessage = row.getString(ERROR_MESSAGE);
+        String name = row.getString(MAIL_KEY);
+        Date lastUpdated = row.getTimestamp(LAST_UPDATED);
+        Map<String, ByteBuffer> rawAttributes = row.getMap(ATTRIBUTES, String.class, ByteBuffer.class);
+        PerRecipientHeaders perRecipientHeaders = fromHeaderMap(row.getMap(PER_RECIPIENT_SPECIFIC_HEADERS, String.class, UDTValue.class));
+
+        MailImpl mail = MailImpl.builder()
+            .name(name)
+            .sender(sender)
+            .recipients(recipients)
+            .lastUpdated(lastUpdated)
+            .errorMessage(errorMessage)
+            .remoteHost(remoteHost)
+            .remoteAddr(remoteAddr)
+            .state(state)
+            .addAllHeadersForRecipients(perRecipientHeaders)
+            .attributes(toAttributes(rawAttributes))
+            .build();
+
+        return EnqueuedMail.builder()
+            .mail(mail)
+            .bucketId(bucketId)
+            .timeRangeStart(timeRangeStart)
+            .enqueuedTime(enqueuedTime)
+            .mailKey(MailKey.of(name))
+            .mailQueueName(queueName)
+            .build();
+    }
+
+    private static Map<String, Serializable> toAttributes(Map<String, ByteBuffer> rowAttributes) {
+        return rowAttributes.entrySet()
+            .stream()
+            .collect(ImmutableMap.toImmutableMap(
+                Map.Entry::getKey,
+                entry -> fromByteBuffer(entry.getValue())));
+    }
+
+    private static Serializable fromByteBuffer(ByteBuffer byteBuffer) {
+        try {
+            byte[] data = new byte[byteBuffer.remaining()];
+            byteBuffer.get(data);
+            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data));
+            return (Serializable) objectInputStream.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static PerRecipientHeaders fromHeaderMap(Map<String, UDTValue> rawMap) {
+        PerRecipientHeaders result = new PerRecipientHeaders();
+
+        rawMap.forEach((key, value) -> result.addHeaderForRecipient(PerRecipientHeaders.Header.builder()
+                .name(value.getString(HEADER_NAME))
+                .value(value.getString(HEADER_VALUE))
+                .build(),
+            toMailAddress(key)));
+        return result;
+    }
+
+    private static MailAddress toMailAddress(String rawValue) {
+        try {
+            return new MailAddress(rawValue);
+        } catch (AddressException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static ImmutableList<String> asStringList(Collection<MailAddress> mailAddresses) {
+        return mailAddresses.stream()
+            .map(MailAddress::asString)
+            .collect(ImmutableList.toImmutableList());
+    }
+
+    static ImmutableMap<String, ByteBuffer> toRawAttributeMap(Mail mail) {
+        return Iterators.toStream(mail.getAttributeNames())
+            .map(name -> Pair.of(name, mail.getAttribute(name)))
+            .map(pair -> Pair.of(pair.getLeft(), toByteBuffer(pair.getRight())))
+            .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight));
+    }
+
+    private static ByteBuffer toByteBuffer(Serializable serializable) {
+        try {
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            new ObjectOutputStream(outputStream).writeObject(serializable);
+            return ByteBuffer.wrap(outputStream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static ImmutableMap<String, UDTValue> toHeaderMap(CassandraTypesProvider cassandraTypesProvider,
+                                                      PerRecipientHeaders perRecipientHeaders) {
+        return perRecipientHeaders.getHeadersByRecipient()
+            .asMap()
+            .entrySet()
+            .stream()
+            .flatMap(entry -> entry.getValue().stream().map(value -> Pair.of(entry.getKey(), value)))
+            .map(entry -> Pair.of(entry.getKey().asString(),
+                cassandraTypesProvider.getDefinedUserType(HEADER_TYPE)
+                    .newValue()
+                    .setString(HEADER_NAME, entry.getRight().getName())
+                    .setString(HEADER_VALUE, entry.getRight().getValue())))
+            .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
index f7d5ac1..1f5d3d5 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.queue.rabbitmq.view.cassandra.model;
 
+import java.time.Duration;
 import java.time.Instant;
 import java.util.Objects;
 import java.util.stream.LongStream;
@@ -37,7 +38,7 @@ public class BucketedSlices {
         private final int value;
 
         private BucketId(int value) {
-            Preconditions.checkArgument(value >= 0, "sliceWindowSizeInSecond should not be negative");
+            Preconditions.checkArgument(value >= 0, "bucketId should not be negative");
 
             this.value = value;
         }
@@ -64,19 +65,19 @@ public class BucketedSlices {
 
     public static class Slice {
 
-        public static Slice of(Instant sliceStartInstant, long sliceWindowSizeInSecond) {
-            return new Slice(sliceStartInstant, sliceWindowSizeInSecond);
+        public static Slice of(Instant sliceStartInstant, Duration sliceWindowSize) {
+            return new Slice(sliceStartInstant, sliceWindowSize);
         }
 
         public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt) {
             long sliceCount = calculateSliceCount(firstSlice, endAt);
             long startAtSeconds =  firstSlice.getStartSliceInstant().getEpochSecond();
-            long sliceWindowSizeInSecond = firstSlice.getSliceWindowSizeInSecond();
+            long sliceWindowSizeInSecond = firstSlice.getSliceWindowSize().getSeconds();
 
             return LongStream.range(0, sliceCount)
                 .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
                 .mapToObj(Instant::ofEpochSecond)
-                .map(sliceStartInstant -> Slice.of(sliceStartInstant, firstSlice.getSliceWindowSizeInSecond()));
+                .map(sliceStartInstant -> Slice.of(sliceStartInstant, firstSlice.getSliceWindowSize()));
         }
 
         private static long calculateSliceCount(Slice firstSlice, Instant endAt) {
@@ -87,27 +88,27 @@ public class BucketedSlices {
             if (timeDiffInSecond < 0) {
                 return 0;
             } else {
-                return (timeDiffInSecond / firstSlice.sliceWindowSizeInSecond) + 1;
+                return (timeDiffInSecond / firstSlice.getSliceWindowSize().getSeconds()) + 1;
             }
         }
 
         private final Instant startSliceInstant;
-        private final long sliceWindowSizeInSecond;
+        private final Duration sliceWindowSize;
 
-        private Slice(Instant startSliceInstant, long sliceWindowSizeInSecond) {
+        private Slice(Instant startSliceInstant, Duration sliceWindowSize) {
             Preconditions.checkNotNull(startSliceInstant);
-            Preconditions.checkArgument(sliceWindowSizeInSecond > 0, "sliceWindowSizeInSecond should be positive");
+            Preconditions.checkNotNull(sliceWindowSize);
 
             this.startSliceInstant = startSliceInstant;
-            this.sliceWindowSizeInSecond = sliceWindowSizeInSecond;
+            this.sliceWindowSize = sliceWindowSize;
         }
 
         public Instant getStartSliceInstant() {
             return startSliceInstant;
         }
 
-        public long getSliceWindowSizeInSecond() {
-            return sliceWindowSizeInSecond;
+        public Duration getSliceWindowSize() {
+            return sliceWindowSize;
         }
 
         @Override
@@ -115,7 +116,7 @@ public class BucketedSlices {
             if (o instanceof Slice) {
                 Slice slice = (Slice) o;
 
-                return Objects.equals(this.sliceWindowSizeInSecond, slice.sliceWindowSizeInSecond)
+                return Objects.equals(this.sliceWindowSize, slice.sliceWindowSize)
                     && Objects.equals(this.startSliceInstant, slice.startSliceInstant);
             }
             return false;
@@ -123,7 +124,7 @@ public class BucketedSlices {
 
         @Override
         public final int hashCode() {
-            return Objects.hash(startSliceInstant, sliceWindowSizeInSecond);
+            return Objects.hash(startSliceInstant, sliceWindowSize);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
index 6e022b3..93d8e8b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
@@ -20,7 +20,6 @@
 package org.apache.james.queue.rabbitmq.view.cassandra.model;
 
 import java.time.Instant;
-import java.util.Comparator;
 import java.util.Objects;
 
 import org.apache.james.queue.rabbitmq.MailQueueName;
@@ -90,10 +89,6 @@ public class EnqueuedMail {
             new Builder.LastStage(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName);
     }
 
-    public static Comparator<EnqueuedMail> getEnqueuedTimeComparator() {
-        return Comparator.comparing(EnqueuedMail::getEnqueuedTime);
-    }
-
     private final Mail mail;
     private final BucketedSlices.BucketId bucketId;
     private final Instant timeRangeStart;
@@ -143,6 +138,7 @@ public class EnqueuedMail {
             return Objects.equals(this.bucketId, that.bucketId)
                     && Objects.equals(this.mail, that.mail)
                     && Objects.equals(this.timeRangeStart, that.timeRangeStart)
+                    && Objects.equals(this.enqueuedTime, that.enqueuedTime)
                     && Objects.equals(this.mailKey, that.mailKey)
                     && Objects.equals(this.mailQueueName, that.mailQueueName);
         }
@@ -151,6 +147,6 @@ public class EnqueuedMail {
 
     @Override
     public final int hashCode() {
-        return Objects.hash(mail, bucketId, timeRangeStart, mailKey, mailQueueName);
+        return Objects.hash(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
index c39dc16..4f17a71 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.queue.rabbitmq.view.cassandra.model;
 
+import java.util.Objects;
+
 import org.apache.mailet.Mail;
 
 import com.google.common.base.Preconditions;
@@ -45,4 +47,19 @@ public class MailKey {
     public String getMailKey() {
         return mailKey;
     }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof MailKey) {
+            MailKey mailKey1 = (MailKey) o;
+
+            return Objects.equals(this.mailKey, mailKey1.mailKey);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(mailKey);
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index f883883..92b384a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -19,11 +19,23 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.queue.api.Mails.defaultMail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import javax.mail.internet.MimeMessage;
 
@@ -34,7 +46,8 @@ import org.apache.james.backend.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
-import org.apache.james.backends.cassandra.DockerCassandraExtension;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.Store;
@@ -43,33 +56,51 @@ import org.apache.james.blob.cassandra.CassandraBlobsDAO;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueContract;
 import org.apache.james.queue.api.MailQueueMetricContract;
 import org.apache.james.queue.api.MailQueueMetricExtension;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.ManageableMailQueueContract;
+import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewConfiguration;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
+import org.apache.james.util.streams.Iterators;
+import org.apache.mailet.Mail;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.github.fge.lambdas.Throwing;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
-@ExtendWith({ReusableDockerRabbitMQExtension.class, DockerCassandraExtension.class})
-public class RabbitMQMailQueueTest implements MailQueueContract, MailQueueMetricContract {
+@ExtendWith(ReusableDockerRabbitMQExtension.class)
+public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    private static final int THREE_BUCKET_COUNT = 3;
+    private static final int UPDATE_BROWSE_START_PACE = 2;
+    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
+    private static final String SPOOL = "spool";
+    private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
+    private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS);
+    private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
+    private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS);
+    private static final Instant IN_SLICE_6 = IN_SLICE_1.plus(6, HOURS);
 
-    private static CassandraCluster cassandra;
 
-    private RabbitMQMailQueueFactory mailQueueFactory;
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
+        CassandraBlobModule.MODULE,
+        CassandraMailQueueViewModule.MODULE));
 
-    @BeforeAll
-    static void setUpClass(DockerCassandraExtension.DockerCassandra dockerCassandra) {
-        cassandra = CassandraCluster.create(CassandraBlobModule.MODULE, dockerCassandra.getHost());
-    }
+    private RabbitMQMailQueueFactory mailQueueFactory;
+    private Clock clock;
 
     @BeforeEach
-    void setup(DockerRabbitMQ rabbitMQ, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws IOException, TimeoutException, URISyntaxException {
+    void setup(DockerRabbitMQ rabbitMQ, CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws IOException, TimeoutException, URISyntaxException {
         CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY);
         Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore();
 
@@ -83,44 +114,217 @@ public class RabbitMQMailQueueTest implements MailQueueContract, MailQueueMetric
             .setHost(rabbitMQ.getHostIp())
             .setPort(rabbitMQ.getAdminPort())
             .build();
+        clock = mock(Clock.class);
+        when(clock.instant()).thenReturn(Instant.parse("2007-12-03T10:15:30.00Z"));
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        MailQueueView mailQueueView = CassandraMailQueueViewTestFactory.factory(clock, random, cassandra.getConf(), cassandra.getTypesProvider(),
+            CassandraMailQueueViewConfiguration.builder()
+                    .bucketCount(THREE_BUCKET_COUNT)
+                    .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
+                    .sliceWindow(ONE_HOUR_SLICE_WINDOW)
+                    .build())
+            .create(MailQueueName.fromString(SPOOL));
 
         RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder()
             .amqpUri(amqpUri)
             .managementUri(rabbitManagementUri)
             .build();
 
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(
-            rabbitMQConfiguration,
-            new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration,
+                new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
         RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory));
         RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(
-            metricTestSystem.getSpyMetricFactory(),
-            rabbitClient,
-            mimeMessageStore,
-            BLOB_ID_FACTORY);
+                    metricTestSystem.getSpyMetricFactory(),
+                    rabbitClient,
+                    mimeMessageStore,
+                    BLOB_ID_FACTORY,
+                    mailQueueView);
+
         RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown(CassandraCluster cassandra) {
         cassandra.clearTables();
     }
 
     @AfterAll
-    static void tearDownClass() {
+    static void tearDownClass(CassandraCluster cassandra) {
         cassandra.closeCluster();
     }
 
     @Override
     public MailQueue getMailQueue() {
-        return mailQueueFactory.createQueue("spool");
+        return mailQueueFactory.createQueue(SPOOL);
+    }
+
+    @Override
+    public ManageableMailQueue getManageableMailQueue() {
+        return mailQueueFactory.createQueue(SPOOL);
+    }
+
+    @Test
+    void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
+        ManageableMailQueue mailQueue = getManageableMailQueue();
+        int emailCount = 5;
+
+        when(clock.instant()).thenReturn(IN_SLICE_1);
+        enqueueMailsInSlice(1, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_2);
+        enqueueMailsInSlice(2, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_3);
+        enqueueMailsInSlice(3, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_5);
+        enqueueMailsInSlice(5, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_6);
+        Stream<String> names = Iterators.toStream(mailQueue.browse())
+            .map(ManageableMailQueue.MailQueueItemView::getMail)
+            .map(Mail::getName);
+
+        assertThat(names).containsExactly(
+            "1-1", "1-2", "1-3", "1-4", "1-5",
+            "2-1", "2-2", "2-3", "2-4", "2-5",
+            "3-1", "3-2", "3-3", "3-4", "3-5",
+            "5-1", "5-2", "5-3", "5-4", "5-5");
+    }
+
+    @Test
+    void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
+        ManageableMailQueue mailQueue = getManageableMailQueue();
+        int emailCount = 5;
+
+        when(clock.instant()).thenReturn(IN_SLICE_1);
+        enqueueMailsInSlice(1, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_2);
+        enqueueMailsInSlice(2, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_3);
+        enqueueMailsInSlice(3, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_5);
+        enqueueMailsInSlice(5, emailCount);
+
+        when(clock.instant()).thenReturn(IN_SLICE_6);
+
+        dequeueMails(5);
+
+        dequeueMails(3);
+        MailQueue.MailQueueItem item2_4 = mailQueue.deQueue();
+        item2_4.done(false);
+        dequeueMails(1);
+
+        dequeueMails(5);
+
+        Stream<String> names = Iterators.toStream(mailQueue.browse())
+            .map(ManageableMailQueue.MailQueueItemView::getMail)
+            .map(Mail::getName);
+
+        assertThat(names)
+            .containsExactly("2-4", "5-1", "5-2", "5-3", "5-4", "5-5");
+    }
+
+    @Disabled
+    @Override
+    public void clearShouldNotFailWhenBrowsingIterating() {
+
+    }
+
+    @Disabled
+    @Override
+    public void browseShouldNotFailWhenConcurrentClearWhenIterating() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeShouldNotFailWhenBrowsingIterating() {
+
+    }
+
+    @Disabled
+    @Override
+    public void browseShouldNotFailWhenConcurrentRemoveWhenIterating() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeByNameShouldRemoveSpecificEmail() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeBySenderShouldRemoveSpecificEmail() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeByRecipientShouldRemoveSpecificEmail() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeByNameShouldNotFailWhenQueueIsEmpty() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeBySenderShouldNotFailWhenQueueIsEmpty() {
+
+    }
+
+    @Disabled
+    @Override
+    public void removeByRecipientShouldNotFailWhenQueueIsEmpty() {
+
+    }
+
+    @Disabled
+    @Override
+    public void clearShouldNotFailWhenQueueIsEmpty() {
+
+    }
+
+    @Disabled
+    @Override
+    public void clearShouldRemoveAllElements() {
     }
 
     @Disabled("RabbitMQ Mail Queue do not yet implement getSize()")
     @Override
     public void constructorShouldRegisterGetQueueSizeGauge(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) {
+    }
+
+    private void enqueueMailsInSlice(int slice, int emailCount) {
+        ManageableMailQueue mailQueue = getManageableMailQueue();
+
+        IntStream.rangeClosed(1, emailCount)
+            .forEach(Throwing.intConsumer(bucketId -> mailQueue.enQueue(defaultMail()
+                .name(slice + "-" + bucketId)
+                .build())));
+    }
 
+    private void dequeueMails(int times) {
+        ManageableMailQueue mailQueue = getManageableMailQueue();
+        IntStream.rangeClosed(1, times)
+            .forEach(Throwing.intConsumer(bucketId -> mailQueue.deQueue().done(true)));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 18cca28..bf7d37e 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -36,11 +36,12 @@ import org.apache.james.backend.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension;
 import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
-import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueFactoryContract;
+import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -55,6 +56,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
     @BeforeEach
     void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException {
         Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = mock(Store.class);
+        MailQueueView mailQueueView = mock(MailQueueView.class);
 
         URI amqpUri = new URIBuilder()
             .setScheme("amqp")
@@ -78,8 +80,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
         RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory));
-        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY);
-
+        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY, mailQueueView);
         RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
     }
@@ -88,4 +89,4 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
     public MailQueueFactory<RabbitMQMailQueue> getMailQueueFactory() {
         return mailQueueFactory;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
new file mode 100644
index 0000000..ef844c6
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.util.Optional;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class BrowseStartDAOTest {
+
+    private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
+    private static final MailQueueName OUT_GOING_2 = MailQueueName.fromString("OUT_GOING_2");
+    private static final Instant NOW = Instant.now();
+    private static final Instant NOW_PLUS_TEN_SECONDS = NOW.plusSeconds(10);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE);
+
+    private BrowseStartDAO testee;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        testee = new BrowseStartDAO(cassandra.getConf());
+    }
+
+    @Test
+    void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() {
+        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+
+        Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
+        assertThat(firstEnqueuedItemFromQueue2)
+            .isEmpty();
+    }
+
+    @Test
+    void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() {
+        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_2, NOW).join();
+
+        Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
+        assertThat(firstEnqueuedItemFromQueue2)
+            .isNotEmpty();
+    }
+
+    @Test
+    void updateFirstEnqueuedTimeShouldWork() {
+        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+
+        assertThat(testee.selectOne(OUT_GOING_1).join())
+            .isNotEmpty();
+    }
+
+    @Test
+    void insertInitialBrowseStartShouldInsertFirstInstant() {
+        testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join();
+        testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join();
+
+        assertThat(testee.findBrowseStart(OUT_GOING_1).join())
+            .contains(NOW);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
new file mode 100644
index 0000000..95733c2
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -0,0 +1,48 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.time.Clock;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+
+import com.datastax.driver.core.Session;
+
+public class CassandraMailQueueViewTestFactory {
+
+    public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session,
+                                                         CassandraTypesProvider typesProvider,
+                                                         CassandraMailQueueViewConfiguration configuration) {
+        EnqueuedMailsDAO enqueuedMailsDao = new EnqueuedMailsDAO(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION, typesProvider);
+        BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
+        DeletedMailsDAO deletedMailsDao = new DeletedMailsDAO(session);
+
+        CassandraMailQueueBrowser cassandraMailQueueBrowser = new CassandraMailQueueBrowser(browseStartDao, deletedMailsDao, enqueuedMailsDao, configuration, clock);
+        CassandraMailQueueMailStore cassandraMailQueueMailStore = new CassandraMailQueueMailStore(enqueuedMailsDao, browseStartDao, configuration, clock);
+        CassandraMailQueueMailDelete cassandraMailQueueMailDelete = new CassandraMailQueueMailDelete(deletedMailsDao, browseStartDao, cassandraMailQueueBrowser, configuration, random);
+
+        return new CassandraMailQueueView.Factory(
+            cassandraMailQueueMailStore,
+            cassandraMailQueueBrowser,
+            cassandraMailQueueMailDelete);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
new file mode 100644
index 0000000..d9dc69a
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
@@ -0,0 +1,108 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class DeletedMailsDAOTest {
+
+    private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
+    private static final MailQueueName OUT_GOING_2 = MailQueueName.fromString("OUT_GOING_2");
+    private static final MailKey MAIL_KEY_1 = MailKey.of("mailkey1");
+    private static final MailKey MAIL_KEY_2 = MailKey.of("mailkey2");
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE);
+
+    private DeletedMailsDAO testee;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        testee = new DeletedMailsDAO(cassandra.getConf());
+    }
+
+    @Test
+    void markAsDeletedShouldWork() {
+        Boolean isDeletedBeforeMark = testee
+                .isDeleted(OUT_GOING_1, MAIL_KEY_1)
+                .join();
+        assertThat(isDeletedBeforeMark).isFalse();
+
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+
+        Boolean isDeletedAfterMark = testee
+            .isDeleted(OUT_GOING_1, MAIL_KEY_1)
+            .join();
+
+        assertThat(isDeletedAfterMark).isTrue();
+    }
+
+    @Test
+    void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() {
+        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join();
+
+        Boolean isDeleted = testee
+            .isDeleted(OUT_GOING_1, MAIL_KEY_1)
+            .join();
+
+        assertThat(isDeleted).isFalse();
+    }
+
+    @Test
+    void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() {
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join();
+
+        Boolean isDeleted = testee
+            .isDeleted(OUT_GOING_1, MAIL_KEY_1)
+            .join();
+
+        assertThat(isDeleted).isFalse();
+    }
+
+    @Test
+    void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() {
+        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join();
+
+        Boolean isDeleted = testee
+            .isDeleted(OUT_GOING_1, MAIL_KEY_1)
+            .join();
+
+        assertThat(isDeleted).isFalse();
+    }
+
+    @Test
+    void checkDeletedShouldReturnTrueWhenTableContainsMailItem() {
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+
+        Boolean isDeleted = testee
+            .isDeleted(OUT_GOING_1, MAIL_KEY_1)
+            .join();
+
+        assertThat(isDeleted).isTrue();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
new file mode 100644
index 0000000..94a80a3
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
@@ -0,0 +1,126 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
+import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
+import org.apache.mailet.base.test.FakeMail;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class EnqueuedMailsDaoTest {
+
+    private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
+    private static final MailKey MAIL_KEY_1 = MailKey.of("mailkey1");
+    private static int BUCKET_ID_VALUE = 10;
+    private static final BucketId BUCKET_ID = BucketId.of(BUCKET_ID_VALUE);
+    private static final Instant NOW = Instant.now();
+    private static final Slice SLICE_OF_NOW = Slice.of(NOW, Duration.ofSeconds(100));
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE);
+
+    private EnqueuedMailsDAO testee;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        testee = new EnqueuedMailsDAO(
+            cassandra.getConf(),
+            CassandraUtils.WITH_DEFAULT_CONFIGURATION,
+            cassandra.getTypesProvider());
+    }
+
+    @Test
+    void insertShouldWork() throws Exception {
+        testee.insert(EnqueuedMail.builder()
+                .mail(FakeMail.builder()
+                    .name(MAIL_KEY_1.getMailKey())
+                    .build())
+                .bucketId(BucketId.of(BUCKET_ID_VALUE))
+                .timeRangeStart(NOW)
+                .enqueuedTime(NOW)
+                .mailKey(MAIL_KEY_1)
+                .mailQueueName(OUT_GOING_1)
+                .build())
+            .join();
+
+        Stream<EnqueuedMail> selectedEnqueuedMails = testee
+            .selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+            .join();
+
+        assertThat(selectedEnqueuedMails).hasSize(1);
+    }
+
+    @Test
+    void selectEnqueuedMailsShouldWork() throws Exception {
+        testee.insert(EnqueuedMail.builder()
+                .mail(FakeMail.builder()
+                    .name(MAIL_KEY_1.getMailKey())
+                    .build())
+                .bucketId(BucketId.of(BUCKET_ID_VALUE))
+                .timeRangeStart(NOW)
+                .enqueuedTime(NOW)
+                .mailKey(MAIL_KEY_1)
+                .mailQueueName(OUT_GOING_1)
+                .build())
+            .join();
+
+        testee.insert(EnqueuedMail.builder()
+                .mail(FakeMail.builder()
+                    .name(MAIL_KEY_1.getMailKey())
+                    .build())
+                .bucketId(BucketId.of(BUCKET_ID_VALUE + 1))
+                .timeRangeStart(NOW)
+                .enqueuedTime(NOW)
+                .mailKey(MAIL_KEY_1)
+                .mailQueueName(OUT_GOING_1)
+                .build())
+            .join();
+
+        Stream<EnqueuedMail> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+            .join();
+
+        assertThat(selectedEnqueuedMails)
+            .hasSize(1)
+            .hasOnlyOneElementSatisfying(selectedEnqueuedMail -> {
+                SoftAssertions softly = new SoftAssertions();
+                softly.assertThat(selectedEnqueuedMail.getMailQueueName()).isEqualTo(OUT_GOING_1);
+                softly.assertThat(selectedEnqueuedMail.getBucketId()).isEqualTo(BUCKET_ID);
+                softly.assertThat(selectedEnqueuedMail.getTimeRangeStart()).isEqualTo(NOW);
+                softly.assertThat(selectedEnqueuedMail.getEnqueuedTime()).isEqualTo(NOW);
+                softly.assertThat(selectedEnqueuedMail.getMailKey()).isEqualTo(MAIL_KEY_1);
+                softly.assertAll();
+            });
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
index 110d870..5386050 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
@@ -21,13 +21,17 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model;
 
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.time.Duration;
 import java.time.Instant;
 import java.util.stream.Stream;
 
-import org.junit.jupiter.api.Nested;
+import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
 import org.junit.jupiter.api.Test;
 
+import nl.jqno.equalsverifier.EqualsVerifier;
+
 class BucketedSlicesTest {
     
     private static final long ONE_HOUR_IN_SECONDS = 3600;
@@ -36,28 +40,64 @@ class BucketedSlicesTest {
     private static final Instant FIRST_SLICE_INSTANT_NEXT_HOUR = FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS);
     private static final Instant FIRST_SLICE_INSTANT_NEXT_TWO_HOUR = FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS * 2);
 
-    private static final Slice FIRST_SLICE = Slice.of(FIRST_SLICE_INSTANT, ONE_HOUR_IN_SECONDS);
-    private static final Slice FIRST_SLICE_NEXT_TWO_HOUR = Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS);
+    private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofSeconds(ONE_HOUR_IN_SECONDS);
+    private static final Slice FIRST_SLICE = Slice.of(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
+    private static final Slice FIRST_SLICE_NEXT_TWO_HOUR = Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, Duration.ofSeconds(ONE_HOUR_IN_SECONDS));
+
+    @Test
+    void bucketIdShouldMatchBeanContract() {
+        EqualsVerifier.forClass(BucketId.class)
+            .verify();
+    }
+
+    @Test
+    void sliceShouldMatchBeanContract() {
+        EqualsVerifier.forClass(Slice.class)
+            .verify();
+    }
 
-    @Nested
-    class Validation {
+    @Test
+    void bucketIdShouldThrowWhenValueIsNegative() {
+        assertThatThrownBy(() -> BucketId.of(-1))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
     void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() {
-        assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(3599)))
+        assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1)))
             .containsOnly(FIRST_SLICE);
     }
 
     @Test
     void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() {
-        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(3599));
+        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1));
 
         assertThat(allSlices)
             .containsExactly(
                 FIRST_SLICE,
-                Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_IN_SECONDS),
-                Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS));
+                Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_SLICE_WINDOW),
+                Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW));
+    }
+
+    @Test
+    void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() {
+        Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR);
+        Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000));
+        Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1));
+
+        Slice [] allSlicesInThreeHours = {
+            FIRST_SLICE,
+            Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_SLICE_WINDOW),
+            Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW)};
+
+        assertThat(allSlicesEndAtTheStartOfWindow)
+            .containsExactly(allSlicesInThreeHours);
+
+        assertThat(allSlicesEndAtTheMiddleOfWindow)
+            .containsExactly(allSlicesInThreeHours);
+
+        assertThat(allSlicesEndAtTheEndWindow)
+            .containsExactly(allSlicesInThreeHours);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java
new file mode 100644
index 0000000..fb0f487
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra.model;
+
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class EnqueuedMailTest {
+
+    @Test
+    void shouldMatchBeanContract() {
+        EqualsVerifier.forClass(EnqueuedMail.class)
+            .verify();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java
new file mode 100644
index 0000000..c1a854e
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra.model;
+
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class MailKeyTest {
+
+    @Test
+    void shouldMatchBeanContract() {
+        EqualsVerifier.forClass(MailKey.class)
+            .verify();
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message