james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [3/3] james-project git commit: JAMES-2550 Mailqueue with reactor
Date Thu, 06 Dec 2018 08:05:40 GMT
JAMES-2550 Mailqueue with reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f121dd8d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f121dd8d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f121dd8d

Branch: refs/heads/master
Commit: f121dd8d413321db9a85a5cce42b572da392c2b5
Parents: e96b5cb
Author: Benoit Tellier <btellier@linagora.com>
Authored: Tue Dec 4 16:10:33 2018 +0700
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Thu Dec 6 15:04:29 2018 +0700

----------------------------------------------------------------------
 pom.xml                                         |  7 +++
 server/queue/queue-rabbitmq/pom.xml             |  9 +++
 .../apache/james/queue/rabbitmq/Dequeuer.java   |  2 +-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java |  4 +-
 .../queue/rabbitmq/view/api/MailQueueView.java  |  2 +-
 .../rabbitmq/view/cassandra/BrowseStartDAO.java | 26 ++++----
 .../cassandra/CassandraMailQueueBrowser.java    | 63 +++++++++-----------
 .../cassandra/CassandraMailQueueMailDelete.java | 31 +++++-----
 .../cassandra/CassandraMailQueueMailStore.java  |  7 ++-
 .../view/cassandra/CassandraMailQueueView.java  | 42 ++++++-------
 .../view/cassandra/DeletedMailsDAO.java         | 20 +++----
 .../view/cassandra/EnqueuedMailsDAO.java        | 25 ++++----
 .../view/cassandra/model/BucketedSlices.java    | 22 +++----
 .../view/cassandra/BrowseStartDAOTest.java      | 27 +++++----
 .../CassandraMailQueueViewTestFactory.java      | 10 +++-
 .../view/cassandra/DeletedMailsDAOTest.java     | 22 +++----
 .../view/cassandra/EnqueuedMailsDaoTest.java    | 16 ++---
 .../cassandra/model/BucketedSlicesTest.java     | 12 ++--
 18 files changed, 178 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 300a18b..c4d85bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -666,6 +666,13 @@
     <dependencyManagement>
         <dependencies>
             <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-bom</artifactId>
+                <version>Bismuth-RELEASE</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+            <dependency>
                 <groupId>${james.groupId}</groupId>
                 <artifactId>apache-james-backends-cassandra</artifactId>
                 <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 14564ff..ab4ffff 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -171,6 +171,15 @@
             <version>${feign.version}</version>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 6caef63..76e9838 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -103,7 +103,7 @@ class Dequeuer {
                 if (success) {
                     dequeueMetric.increment();
                     rabbitClient.ack(deliveryTag);
-                    mailQueueView.delete(DeleteCondition.withName(mail.getName())).join();
+                    mailQueueView.delete(DeleteCondition.withName(mail.getName()));
                 } else {
                     rabbitClient.nack(deliveryTag);
                 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 0909469..1873313 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -93,12 +93,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
 
     @Override
     public long clear() {
-        return mailQueueView.delete(DeleteCondition.all()).join();
+        return mailQueueView.delete(DeleteCondition.all());
     }
 
     @Override
     public long remove(Type type, String value) {
-        return mailQueueView.delete(DeleteCondition.from(type, value)).join();
+        return mailQueueView.delete(DeleteCondition.from(type, value));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index a291d61..12ca723 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -36,7 +36,7 @@ public interface MailQueueView {
 
     CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem);
 
-    CompletableFuture<Long> delete(DeleteCondition deleteCondition);
+    long delete(DeleteCondition deleteCondition);
 
     CompletableFuture<Boolean> isPresent(Mail mail);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
index 4425c46..9552f5c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -31,8 +31,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
 
 import java.time.Instant;
 import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
 
 public class BrowseStartDAO {
 
@@ -79,28 +78,29 @@ public class BrowseStartDAO {
             .value(QUEUE_NAME, bindMarker(QUEUE_NAME)));
     }
 
-    CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) {
+    Mono<Instant> findBrowseStart(MailQueueName queueName) {
         return selectOne(queueName)
-            .thenApply(optional -> optional.map(this::getBrowseStart));
+            .map(this::getBrowseStart);
     }
 
-    CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
-        return executor.executeVoid(updateOne.bind()
+    Mono<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+        return Mono.fromCompletionStage(executor.executeVoid(updateOne.bind()
             .setTimestamp(BROWSE_START, Date.from(sliceStart))
-            .setString(QUEUE_NAME, mailQueueName.asString()));
+            .setString(QUEUE_NAME, mailQueueName.asString())));
     }
 
-    CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
-        return executor.executeVoid(insertOne.bind()
+    Mono<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+        return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
             .setTimestamp(BROWSE_START, Date.from(sliceStart))
-            .setString(QUEUE_NAME, mailQueueName.asString()));
+            .setString(QUEUE_NAME, mailQueueName.asString())));
     }
 
     @VisibleForTesting
-    CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) {
-        return executor.executeSingleRow(
+    Mono<Row> selectOne(MailQueueName queueName) {
+        return Mono.fromCompletionStage(executor.executeSingleRow(
             selectOne.bind()
-                .setString(QUEUE_NAME, queueName.asString()));
+                .setString(QUEUE_NAME, queueName.asString())))
+            .flatMap(Mono::justOrEmpty);
     }
 
     private Instant getBrowseStart(Row row) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index a5a9bb9..4279c15 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -21,16 +21,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
-import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill;
 
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
+import java.util.List;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -44,12 +40,14 @@ import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraMailQueueBrowser {
 
@@ -101,23 +99,24 @@ public class CassandraMailQueueBrowser {
         this.clock = clock;
     }
 
-    CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) {
+    Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) {
         return browseReferences(queueName)
-            .map(this::toMailFuture, FluentFutureStream::unboxFuture)
-            .map(ManageableMailQueue.MailQueueItemView::new)
-            .completableFuture();
+            .flatMapSequential(this::toMailFuture)
+            .map(ManageableMailQueue.MailQueueItemView::new);
     }
 
-    FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
-        return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
-            .thenApply(this::allSlicesStartingAt))
-            .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture);
+    Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
+        return browseStartDao.findBrowseStart(queueName)
+            .flatMapMany(this::allSlicesStartingAt)
+            .flatMapSequential(slice -> browseSlice(queueName, slice))
+            .flatMapSequential(Flux::fromIterable)
+            .subscribeOn(Schedulers.parallel());
     }
 
-    private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
+    private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
         EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
-        return mimeMessageStore.read(enqueuedItem.getPartsId())
-            .thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+        return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId()))
+            .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
     }
 
     private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
@@ -132,31 +131,25 @@ public class CassandraMailQueueBrowser {
         return mail;
     }
 
-    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
-        return FluentFutureStream.of(
+    private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName queueName, Slice slice) {
+        return
             allBucketIds()
-                .map(bucketId ->
-                    browseBucket(queueName, slice, bucketId).completableFuture()),
-            FluentFutureStream::unboxStream)
-            .sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+                .flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
+                .collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
     }
 
-    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
-        return FluentFutureStream.of(
-            enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
-                .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
+    private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
+        return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
+            .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
     }
 
-    private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) {
-        return maybeBrowseStart
-            .map(Slice::of)
-            .map(startSlice -> allSlicesTill(startSlice, clock.instant(), configuration.getSliceWindow()))
-            .orElse(Stream.empty());
+    private Flux<Slice> allSlicesStartingAt(Instant browseStart) {
+        return Flux.fromStream(Slice.of(browseStart).allSlicesTill(clock.instant(), configuration.getSliceWindow()));
     }
 
-    private Stream<BucketId> allBucketIds() {
-        return IntStream
+    private Flux<BucketId> allBucketIds() {
+        return Flux
             .range(0, configuration.getBucketCount())
-            .mapToObj(BucketId::of);
+            .map(BucketId::of);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index b94b1ed..57732cb 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -20,10 +20,7 @@
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.time.Instant;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -32,6 +29,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
 import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueMailDelete {
 
     private final DeletedMailsDAO deletedMailsDao;
@@ -53,42 +52,40 @@ public class CassandraMailQueueMailDelete {
         this.random = random;
     }
 
-    CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
+    Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
         return considerDeleted(MailKey.fromMail(mail), mailQueueName);
     }
 
-    CompletableFuture<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
+    Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
         return deletedMailsDao
             .markAsDeleted(mailQueueName, mailKey)
-            .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName));
+            .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName));
     }
 
-    CompletableFuture<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
+    Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
         return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail));
     }
 
-    CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName) {
-        return findNewBrowseStart(mailQueueName)
-            .thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart));
+    void updateBrowseStart(MailQueueName mailQueueName) {
+        Mono<Instant> newBrowseStart = findNewBrowseStart(mailQueueName);
+        updateNewBrowseStart(mailQueueName, newBrowseStart);
     }
 
     private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
         if (shouldUpdateBrowseStart()) {
-            updateBrowseStart(mailQueueName).join();
+            updateBrowseStart(mailQueueName);
         }
     }
 
-    private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) {
+    private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
-            .completableFuture()
-            .thenApply(Stream::findFirst);
+            .next();
     }
 
-    private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) {
+    private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Mono<Instant> maybeNewBrowseStart) {
         return maybeNewBrowseStart
-            .map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant))
-            .orElse(CompletableFuture.completedFuture(null));
+            .flatMap(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant));
     }
 
     private boolean shouldUpdateBrowseStart() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
index 41c28a9..1dd07f5 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -21,7 +21,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.time.Clock;
 import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -32,6 +31,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Bucke
 import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueMailStore {
 
     private final EnqueuedMailsDAO enqueuedMailsDao;
@@ -50,13 +51,13 @@ public class CassandraMailQueueMailStore {
         this.clock = clock;
     }
 
-    CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
+    Mono<Void> storeMail(EnqueuedItem enqueuedItem) {
         EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem);
 
         return enqueuedMailsDao.insert(enqueuedItemAndSlicing);
     }
 
-    CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) {
+    Mono<Void> initializeBrowseStart(MailQueueName mailQueueName) {
         return browseStartDao
             .insertInitialBrowseStart(mailQueueName, currentSliceStartInstant());
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 4980f46..a0ca3cd 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -20,7 +20,6 @@
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -33,9 +32,10 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueView implements MailQueueView {
 
     public static class Factory implements MailQueueView.Factory {
@@ -80,62 +80,54 @@ public class CassandraMailQueueView implements MailQueueView {
 
     @Override
     public void initialize(MailQueueName mailQueueName) {
-        storeHelper.initializeBrowseStart(mailQueueName)
-            .join();
+        storeHelper.initializeBrowseStart(mailQueueName).block();
     }
 
     @Override
     public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
-        return storeHelper.storeMail(enqueuedItem);
+        return storeHelper.storeMail(enqueuedItem).toFuture();
     }
 
     @Override
     public ManageableMailQueue.MailQueueIterator browse() {
         return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
             cassandraMailQueueBrowser.browse(mailQueueName)
-                .join()
+                .toIterable()
                 .iterator());
     }
 
     @Override
     public long getSize() {
-        return cassandraMailQueueBrowser.browseReferences(mailQueueName)
-                .join()
-                .count();
+        return cassandraMailQueueBrowser.browseReferences(mailQueueName).count().block();
     }
 
     @Override
-    public CompletableFuture<Long> delete(DeleteCondition deleteCondition) {
+    public long delete(DeleteCondition deleteCondition) {
         if (deleteCondition instanceof DeleteCondition.WithName) {
             DeleteCondition.WithName nameDeleteCondition = (DeleteCondition.WithName) deleteCondition;
-
-            return delete(MailKey.of(nameDeleteCondition.getName())).thenApply(any -> 1L);
+            return delete(MailKey.of(nameDeleteCondition.getName())).map(any -> 1L).block();
         }
-
         return browseThenDelete(deleteCondition);
     }
 
-    private CompletableFuture<Long> browseThenDelete(DeleteCondition deleteCondition) {
-        CompletableFuture<Long> result = cassandraMailQueueBrowser.browseReferences(mailQueueName)
+    private long browseThenDelete(DeleteCondition deleteCondition) {
+        return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail()))
-            .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName),
-                FluentFutureStream::unboxFuture)
-            .completableFuture()
-            .thenApply(Stream::count);
-
-        result.thenRunAsync(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
-
-        return result;
+            .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName))
+            .count()
+            .doOnTerminate(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
+            .block();
     }
 
-    private CompletableFuture<Void> delete(MailKey mailKey) {
+    private Mono<Void> delete(MailKey mailKey) {
         return cassandraMailQueueMailDelete.considerDeleted(mailKey, mailQueueName);
     }
 
     @Override
     public CompletableFuture<Boolean> isPresent(Mail mail) {
         return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName)
-                .thenApply(bool -> !bool);
+                .map(bool -> !bool)
+                .toFuture();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
index c5feefc..a1986e1 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
@@ -27,8 +27,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME;
 
-import java.util.concurrent.CompletableFuture;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,6 +35,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class DeletedMailsDAO {
 
@@ -64,20 +63,21 @@ public class DeletedMailsDAO {
             .and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
     }
 
-    CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
-        return executor.executeVoid(insertOne.bind()
+    Mono<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+        return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
             .setString(QUEUE_NAME, mailQueueName.asString())
-            .setString(MAIL_KEY, mailKey.getMailKey()));
+            .setString(MAIL_KEY, mailKey.getMailKey())));
     }
 
-    CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
-        return executor.executeReturnExists(
+    Mono<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+        return Mono.fromCompletionStage(executor.executeReturnExists(
             selectOne.bind()
                 .setString(QUEUE_NAME, mailQueueName.asString())
-                .setString(MAIL_KEY, mailKey.getMailKey()));
+                .setString(MAIL_KEY, mailKey.getMailKey())));
     }
 
-    CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
-        return isDeleted(mailQueueName, mailKey).thenApply(b -> !b);
+    Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
+        return isDeleted(mailQueueName, mailKey)
+            .map(b -> !b);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
index fef9ba1..eb1e9cf 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -47,8 +47,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
 
 import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -64,6 +62,10 @@ import org.apache.mailet.Mail;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 public class EnqueuedMailsDAO {
 
@@ -73,10 +75,12 @@ public class EnqueuedMailsDAO {
     private final CassandraUtils cassandraUtils;
     private final CassandraTypesProvider cassandraTypesProvider;
     private final BlobId.Factory blobFactory;
+    private Scheduler scheduler;
 
     @Inject
     EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider,
                      BlobId.Factory blobIdFactory) {
+        this.scheduler = Schedulers.parallel();
         this.executor = new CassandraAsyncExecutor(session);
         this.cassandraUtils = cassandraUtils;
         this.cassandraTypesProvider = cassandraTypesProvider;
@@ -114,13 +118,13 @@ public class EnqueuedMailsDAO {
             .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
     }
 
-    CompletableFuture<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
+    Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
         EnqueuedItem enqueuedItem = enqueuedItemWithSlicing.getEnqueuedItem();
         EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicing.getSlicingContext();
         Mail mail = enqueuedItem.getMail();
         MimeMessagePartsId mimeMessagePartsId = enqueuedItem.getPartsId();
 
-        return executor.executeVoid(insert.bind()
+        return Mono.fromCompletionStage(executor.executeVoid(insert.bind()
             .setString(QUEUE_NAME, enqueuedItem.getMailQueueName().asString())
             .setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart()))
             .setInt(BUCKET_ID, slicingContext.getBucketId().getValue())
@@ -136,19 +140,20 @@ public class EnqueuedMailsDAO {
             .setString(REMOTE_HOST, mail.getRemoteHost())
             .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
             .setMap(ATTRIBUTES, toRawAttributeMap(mail))
-            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())));
+            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))));
     }
 
-    CompletableFuture<Stream<EnqueuedItemWithSlicingContext>> selectEnqueuedMails(
+    Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(
         MailQueueName queueName, Slice slice, BucketId bucketId) {
 
-        return executor.execute(
+        return Mono.fromCompletionStage(executor.execute(
             selectFrom.bind()
                 .setString(QUEUE_NAME, queueName.asString())
                 .setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant()))
-                .setInt(BUCKET_ID, bucketId.getValue()))
-            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory)));
+                .setInt(BUCKET_ID, bucketId.getValue())))
+            .map(cassandraUtils::convertToStream)
+            .flatMapMany(Flux::fromStream)
+            .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
index d35b6d7..2f60736 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
@@ -69,17 +69,6 @@ public class BucketedSlices {
             return new Slice(sliceStartInstant);
         }
 
-        public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt, Duration windowSize) {
-            long sliceCount = calculateSliceCount(firstSlice, endAt, windowSize);
-            long startAtSeconds =  firstSlice.getStartSliceInstant().getEpochSecond();
-            long sliceWindowSizeInSecond = windowSize.getSeconds();
-
-            return LongStream.range(0, sliceCount)
-                .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
-                .mapToObj(Instant::ofEpochSecond)
-                .map(Slice::of);
-        }
-
         private static long calculateSliceCount(Slice firstSlice, Instant endAt, Duration windowSize) {
             long startAtSeconds =  firstSlice.getStartSliceInstant().getEpochSecond();
             long endAtSeconds = endAt.getEpochSecond();
@@ -104,6 +93,17 @@ public class BucketedSlices {
             return startSliceInstant;
         }
 
+        public Stream<Slice> allSlicesTill(Instant endAt, Duration windowSize) {
+            long sliceCount = calculateSliceCount(this, endAt, windowSize);
+            long startAtSeconds = this.getStartSliceInstant().getEpochSecond();
+            long sliceWindowSizeInSecond = windowSize.getSeconds();
+
+            return LongStream.range(0, sliceCount)
+                .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
+                .mapToObj(Instant::ofEpochSecond)
+                .map(Slice::of);
+        }
+
 
         @Override
         public final boolean equals(Object o) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
index ef844c6..c427d44 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Instant;
-import java.util.Optional;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -31,6 +30,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+
 class BrowseStartDAOTest {
 
     private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
@@ -50,37 +51,37 @@ class BrowseStartDAOTest {
 
     @Test
     void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() {
-        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_1, NOW).block();
 
-        Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
-        assertThat(firstEnqueuedItemFromQueue2)
+        Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+        assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
             .isEmpty();
     }
 
     @Test
     void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() {
-        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
-        testee.updateBrowseStart(OUT_GOING_2, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_1, NOW).block();
+        testee.updateBrowseStart(OUT_GOING_2, NOW).block();
 
-        Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
-        assertThat(firstEnqueuedItemFromQueue2)
+        Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+        assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
             .isNotEmpty();
     }
 
     @Test
     void updateFirstEnqueuedTimeShouldWork() {
-        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_1, NOW).block();
 
-        assertThat(testee.selectOne(OUT_GOING_1).join())
+        assertThat(testee.selectOne(OUT_GOING_1).flux().collectList().block())
             .isNotEmpty();
     }
 
     @Test
     void insertInitialBrowseStartShouldInsertFirstInstant() {
-        testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join();
-        testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join();
+        testee.insertInitialBrowseStart(OUT_GOING_1, NOW).block();
+        testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).block();
 
-        assertThat(testee.findBrowseStart(OUT_GOING_1).join())
+        assertThat(testee.findBrowseStart(OUT_GOING_1).flux().collectList().block())
             .contains(NOW);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 7eadf9c..d9a451a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -25,11 +25,11 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -38,6 +38,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.Eventsourcin
 import com.datastax.driver.core.Session;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueViewTestFactory {
 
     public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session,
@@ -69,7 +71,9 @@ public class CassandraMailQueueViewTestFactory {
     public static boolean isInitialized(Session session, MailQueueName mailQueueName) {
         BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
         return browseStartDao.findBrowseStart(mailQueueName)
-            .thenApply(Optional::isPresent)
-            .join();
+            .map(Optional::ofNullable)
+            .switchIfEmpty(Mono.just(Optional.empty()))
+            .block()
+            .isPresent();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
index d9dc69a..e21804f 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
@@ -50,58 +50,58 @@ class DeletedMailsDAOTest {
     void markAsDeletedShouldWork() {
         Boolean isDeletedBeforeMark = testee
                 .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-                .join();
+                .block();
         assertThat(isDeletedBeforeMark).isFalse();
 
-        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
 
         Boolean isDeletedAfterMark = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeletedAfterMark).isTrue();
     }
 
     @Test
     void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() {
-        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join();
+        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isFalse();
     }
 
     @Test
     void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() {
-        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join();
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isFalse();
     }
 
     @Test
     void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() {
-        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join();
+        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isFalse();
     }
 
     @Test
     void checkDeletedShouldReturnTrueWhenTableContainsMailItem() {
-        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isTrue();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
index 3d44db1..13b9e00 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
@@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.SoftAssertions.assertSoftly;
 
 import java.time.Instant;
-import java.util.stream.Stream;
+import java.util.List;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -86,11 +86,11 @@ class EnqueuedMailsDaoTest {
                     .build())
                 .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
                 .build())
-            .join();
+            .block();
 
-        Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
+        List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
             .selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
-            .join();
+            .collectList().block();
 
         assertThat(selectedEnqueuedMails).hasSize(1);
     }
@@ -108,7 +108,7 @@ class EnqueuedMailsDaoTest {
                     .build())
                 .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
                 .build())
-            .join();
+            .block();
 
         testee.insert(EnqueuedItemWithSlicingContext.builder()
                 .enqueuedItem(EnqueuedItem.builder()
@@ -121,10 +121,10 @@ class EnqueuedMailsDaoTest {
                     .build())
                 .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE + 1), NOW))
                 .build())
-            .join();
+            .block();
 
-        Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
-            .join();
+        List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+            .collectList().block();
 
         assertThat(selectedEnqueuedMails)
             .hasSize(1)

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
index 842216c..1fb6916 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
@@ -64,13 +64,13 @@ class BucketedSlicesTest {
 
     @Test
     void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() {
-        assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
+        assertThat(FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
             .containsOnly(FIRST_SLICE);
     }
 
     @Test
     void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() {
-        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlices = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
 
         assertThat(allSlices)
             .containsExactly(
@@ -81,9 +81,9 @@ class BucketedSlicesTest {
 
     @Test
     void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() {
-        Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
-        Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
-        Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlicesEndAtTheStartOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlicesEndAtTheMiddleOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlicesEndAtTheEndWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
 
         Slice [] allSlicesInThreeHours = {
             FIRST_SLICE,
@@ -102,7 +102,7 @@ class BucketedSlicesTest {
 
     @Test
     void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() {
-        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlices = FIRST_SLICE_NEXT_TWO_HOUR.allSlicesTill(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
 
         assertThat(allSlices).isEmpty();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message