james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 04/07: JAMES-2630 Add CassandraAsyncExecutor.executeRows
Date Fri, 15 Feb 2019 13:30:39 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 04efa195d5bdf3e12feaa4c33e9369e4d8d6621b
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Thu Feb 14 13:32:04 2019 +0100

    JAMES-2630 Add CassandraAsyncExecutor.executeRows
---
 .../cassandra/utils/CassandraAsyncExecutor.java    |  6 +++
 .../versions/CassandraSchemaVersionDAO.java        |  4 +-
 .../eventstore/cassandra/EventStoreDao.java        |  4 +-
 .../cassandra/mail/CassandraAttachmentDAO.java     |  3 +-
 .../mail/CassandraAttachmentMessageIdDAO.java      |  3 +-
 .../mail/CassandraAttachmentOwnerDAO.java          |  3 +-
 .../mail/CassandraMailboxPathDAOImpl.java          |  3 +-
 .../cassandra/mail/CassandraMessageDAO.java        |  3 +-
 .../cassandra/mail/CassandraMessageIdDAO.java      | 48 +++++++++++-----------
 .../cassandra/mail/CassandraMessageIdDAOTest.java  |  8 +++-
 .../domainlist/cassandra/CassandraDomainList.java  |  4 +-
 .../james/mailrepository/cassandra/UrlsDao.java    |  3 +-
 .../rrt/cassandra/CassandraMappingsSourcesDAO.java |  3 +-
 .../CassandraRecipientRewriteTableDAO.java         |  3 +-
 .../james/sieve/cassandra/CassandraSieveDAO.java   |  3 +-
 .../user/cassandra/CassandraUsersRepository.java   |  4 +-
 .../view/cassandra/CassandraMailQueueBrowser.java  |  6 +--
 .../rabbitmq/view/cassandra/EnqueuedMailsDAO.java  |  3 +-
 18 files changed, 52 insertions(+), 62 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index c1b86e2..6e19b60 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -28,6 +28,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
 import net.javacrumbs.futureconverter.java8guava.FutureConverter;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -61,6 +62,11 @@ public class CassandraAsyncExecutor {
                 .flatMap(Mono::justOrEmpty);
     }
 
+    public Flux<Row> executeRows(Statement statement) {
+        return execute(statement)
+            .flatMapMany(Flux::fromIterable);
+    }
+
     public Mono<Optional<Row>> executeSingleRowOptional(Statement statement)
{
         return execute(statement)
             .map(resultSet -> Optional.ofNullable(resultSet.one()));
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
index 6d1ed10..6d958a7 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
@@ -36,7 +36,6 @@ import org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersion
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.utils.UUIDs;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraSchemaVersionDAO {
@@ -65,8 +64,7 @@ public class CassandraSchemaVersionDAO {
     }
 
     public Mono<Optional<SchemaVersion>> getCurrentSchemaVersion() {
-        return cassandraAsyncExecutor.execute(readVersionStatement.bind())
-            .flatMapMany(Flux::fromIterable)
+        return cassandraAsyncExecutor.executeRows(readVersionStatement.bind())
             .map(row -> row.getInt(VALUE))
             .reduce(Math::max)
             .map(SchemaVersion::new)
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
index 226d7b5..d99c15a 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
@@ -45,7 +45,6 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class EventStoreDao {
@@ -97,10 +96,9 @@ public class EventStoreDao {
     }
 
     public History getEventsOfAggregate(AggregateId aggregateId) {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
                 select.bind()
                     .setString(AGGREGATE_ID, aggregateId.asAggregateKey()))
-            .flatMapMany(Flux::fromIterable)
             .map(this::toEvent)
             .collectList()
             .map(History::of)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
index b5f8ec7..f7cdcf6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
@@ -107,11 +107,10 @@ public class CassandraAttachmentDAO {
     }
 
     public Flux<Attachment> retrieveAll() {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
                 selectAllStatement.bind()
                     .setReadTimeoutMillis(configuration.getAttachmentV2MigrationReadTimeout())
                     .setFetchSize(1))
-            .flatMapMany(Flux::fromIterable)
             .map(this::attachment);
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
index c10509f..46d664c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
@@ -74,10 +74,9 @@ public class CassandraAttachmentMessageIdDAO {
 
     public Flux<MessageId> getOwnerMessageIds(AttachmentId attachmentId) {
         Preconditions.checkArgument(attachmentId != null);
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
                 selectStatement.bind()
                     .setUUID(ATTACHMENT_ID_AS_UUID, attachmentId.asUUID()))
-            .flatMapMany(Flux::fromIterable)
             .map(this::rowToMessageId);
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
index d3c6b65..d994881 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
@@ -76,10 +76,9 @@ public class CassandraAttachmentOwnerDAO {
     }
 
     public Flux<Username> retrieveOwners(AttachmentId attachmentId) {
-        return executor.execute(
+        return executor.executeRows(
                 selectStatement.bind()
                     .setUUID(ID, attachmentId.asUUID()))
-            .flatMapMany(Flux::fromIterable)
             .map(this::toOwner);
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
index aae4c7e..99de81b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
@@ -200,8 +200,7 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO
{
     }
 
     public Flux<CassandraIdAndPath> readAll() {
-        return cassandraAsyncExecutor.execute(selectAll.bind())
-            .flatMapMany(Flux::fromIterable)
+        return cassandraAsyncExecutor.executeRows(selectAll.bind())
             .map(this::fromRowToCassandraIdAndPath);
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index beab767..6bb6e24 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -399,10 +399,9 @@ public class CassandraMessageDAO {
     }
 
     public Flux<MessageIdAttachmentIds> retrieveAllMessageIdAttachmentIds() {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
             selectAllMessagesWithAttachment.bind()
                 .setReadTimeoutMillis(configuration.getMessageAttachmentIdsReadTimeout()))
-            .flatMapMany(Flux::fromIterable)
             .map(this::fromRow)
             .filter(MessageIdAttachmentIds::hasAttachment);
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 9bfcd4e..e7cafbe 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -59,7 +59,6 @@ import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
@@ -211,60 +210,59 @@ public class CassandraMessageIdDAO {
     }
 
     public Mono<Optional<ComposedMessageIdWithMetaData>> retrieve(CassandraId
mailboxId, MessageUid uid) {
-        return selectOneRow(mailboxId, uid).map(this::asOptionalOfCassandraMessageId);
+        return asOptionalOfCassandraMessageId(selectOneRow(mailboxId, uid));
     }
 
-    private Optional<ComposedMessageIdWithMetaData> asOptionalOfCassandraMessageId(ResultSet
resultSet) {
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        return Optional.of(fromRowToComposedMessageIdWithFlags(resultSet.one()));
+    private Mono<Optional<ComposedMessageIdWithMetaData>> asOptionalOfCassandraMessageId(Mono<Row>
row) {
+        return row
+                .map(this::fromRowToComposedMessageIdWithFlags)
+                .map(Optional::of)
+                .switchIfEmpty(Mono.just(Optional.empty()));
     }
 
-    private Mono<ResultSet> selectOneRow(CassandraId mailboxId, MessageUid uid) {
-        return cassandraAsyncExecutor.execute(select.bind()
+    private Mono<Row> selectOneRow(CassandraId mailboxId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeSingleRow(select.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(IMAP_UID, uid.asLong()));
     }
 
     public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId mailboxId,
MessageRange set) {
+        return retrieveRows(mailboxId, set)
+            .map(this::fromRowToComposedMessageIdWithFlags);
+    }
+
+    private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set) {
         switch (set.getType()) {
         case ALL:
-            return toMessageIds(selectAll(mailboxId));
+            return selectAll(mailboxId);
         case FROM:
-            return toMessageIds(selectFrom(mailboxId, set.getUidFrom()));
+            return selectFrom(mailboxId, set.getUidFrom());
         case RANGE:
-            return toMessageIds(selectRange(mailboxId, set.getUidFrom(), set.getUidTo()));
+            return selectRange(mailboxId, set.getUidFrom(), set.getUidTo());
         case ONE:
-            return toMessageIds(selectOneRow(mailboxId, set.getUidFrom()));
+            return Flux.concat(selectOneRow(mailboxId, set.getUidFrom()));
         }
         throw new UnsupportedOperationException();
     }
 
-    private Mono<ResultSet> selectAll(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.execute(selectAllUids.bind()
+    private Flux<Row> selectAll(CassandraId mailboxId) {
+        return cassandraAsyncExecutor.executeRows(selectAllUids.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()));
     }
 
-    private Mono<ResultSet> selectFrom(CassandraId mailboxId, MessageUid uid) {
-        return cassandraAsyncExecutor.execute(selectUidGte.bind()
+    private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeRows(selectUidGte.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(IMAP_UID, uid.asLong()));
     }
 
-    private Mono<ResultSet> selectRange(CassandraId mailboxId, MessageUid from, MessageUid
to) {
-        return cassandraAsyncExecutor.execute(selectUidRange.bind()
+    private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from, MessageUid
to) {
+        return cassandraAsyncExecutor.executeRows(selectUidRange.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(IMAP_UID_GTE, from.asLong())
                 .setLong(IMAP_UID_LTE, to.asLong()));
     }
 
-    private Flux<ComposedMessageIdWithMetaData> toMessageIds(Mono<ResultSet>
results) {
-        return results
-            .flatMapMany(Flux::fromIterable)
-            .map(this::fromRowToComposedMessageIdWithFlags);
-    }
-
     private ComposedMessageIdWithMetaData fromRowToComposedMessageIdWithFlags(Row row) {
         return ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index 588a634..c81fbf8 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -27,6 +27,8 @@ import javax.mail.Flags.Flag;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
@@ -41,8 +43,12 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import reactor.core.publisher.Flux;
 
 class CassandraMessageIdDAOTest {
+    public static final CassandraModule MODULE = CassandraModule.aggregateModules(
+        CassandraMessageModule.MODULE,
+        CassandraSchemaVersionModule.MODULE);
+
     @RegisterExtension
-    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMessageModule.MODULE);
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
 
     private CassandraMessageId.Factory messageIdFactory;
     private CassandraMessageIdDAO testee;
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
index 1766aeb..4014e39 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
@@ -39,7 +39,6 @@ import org.apache.james.domainlist.lib.AbstractDomainList;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import reactor.core.publisher.Flux;
 
 public class CassandraDomainList extends AbstractDomainList {
     private final CassandraAsyncExecutor executor;
@@ -84,8 +83,7 @@ public class CassandraDomainList extends AbstractDomainList {
 
     @Override
     protected List<Domain> getDomainListInternal() throws DomainListException {
-        return executor.execute(readAllStatement.bind())
-            .flatMapMany(Flux::fromIterable)
+        return executor.executeRows(readAllStatement.bind())
             .map(row -> Domain.of(row.getString(DOMAIN)))
             .collectList()
             .block();
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/UrlsDao.java
b/server/data/data-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/UrlsDao.java
index 4421e49..ac72216 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/UrlsDao.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/UrlsDao.java
@@ -84,8 +84,7 @@ public class UrlsDao {
     }
 
     public Flux<MailRepositoryUrl> retrieveUsedUrls() {
-        return executor.execute(selectAll.bind())
-            .flatMapMany(Flux::fromIterable)
+        return executor.executeRows(selectAll.bind())
             .map(this::toUrl);
     }
 
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
index 21a81e3..1002b1f 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
@@ -99,10 +99,9 @@ public class CassandraMappingsSourcesDAO {
     }
 
     public Flux<MappingSource> retrieveSources(Mapping mapping) {
-        return executor.execute(retrieveSourcesStatement.bind()
+        return executor.executeRows(retrieveSourcesStatement.bind()
             .setString(MAPPING_TYPE, mapping.getType().asPrefix())
             .setString(MAPPING_VALUE, mapping.getMappingValue()))
-            .flatMapMany(Flux::fromIterable)
             .map(row -> MappingSource.parse(row.getString(SOURCE)));
     }
 
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
index 6c1078e..72eaf48 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -116,8 +116,7 @@ public class CassandraRecipientRewriteTableDAO {
     }
 
     public Flux<Pair<MappingSource, Mapping>> getAllMappings() {
-        return executor.execute(retrieveAllMappingsStatement.bind())
-            .flatMapMany(Flux::fromIterable)
+        return executor.executeRows(retrieveAllMappingsStatement.bind())
             .map(row -> Pair.of(
                 MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)),
                 Mapping.of(row.getString(MAPPING))));
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
index e4895ad..de1f5ac 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
@@ -106,10 +106,9 @@ public class CassandraSieveDAO {
     }
 
     public Flux<ScriptSummary> listScripts(User user) {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
                 selectScriptsStatement.bind()
                     .setString(USER_NAME, user.asString()))
-            .flatMapMany(Flux::fromIterable)
             .map(row -> new ScriptSummary(
                     new ScriptName(row.getString(SCRIPT_NAME)),
                     row.getBool(IS_ACTIVE)));
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
index 1fc36ff..eeaf265 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
@@ -52,7 +52,6 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
-import reactor.core.publisher.Flux;
 
 public class CassandraUsersRepository extends AbstractUsersRepository {
 
@@ -180,8 +179,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository
{
 
     @Override
     public Iterator<String> list() throws UsersRepositoryException {
-        return executor.execute(listStatement.bind())
-            .flatMapMany(Flux::fromIterable)
+        return executor.executeRows(listStatement.bind())
             .map(row -> row.getString(REALNAME))
             .toIterable()
             .iterator();
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 29da254..9054e1b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -26,7 +26,6 @@ import java.time.Clock;
 import java.time.Instant;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -109,7 +108,6 @@ public class CassandraMailQueueBrowser {
         return browseStartDao.findBrowseStart(queueName)
             .flatMapMany(this::allSlicesStartingAt)
             .flatMapSequential(slice -> browseSlice(queueName, slice))
-            .flatMapSequential(Flux::fromIterable)
             .subscribeOn(Schedulers.parallel());
     }
 
@@ -131,11 +129,11 @@ public class CassandraMailQueueBrowser {
         return mail;
     }
 
-    private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName
queueName, Slice slice) {
+    private Flux<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName,
Slice slice) {
         return
             allBucketIds()
                 .flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
-                .collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+                .sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
     }
 
     private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName,
Slice slice, BucketId bucketId) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
index 75101c4..009cfb7 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -142,12 +142,11 @@ public class EnqueuedMailsDAO {
     Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(
         MailQueueName queueName, Slice slice, BucketId bucketId) {
 
-        return executor.execute(
+        return executor.executeRows(
                 selectFrom.bind()
                     .setString(QUEUE_NAME, queueName.asString())
                     .setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant()))
                     .setInt(BUCKET_ID, bucketId.getValue()))
-            .flatMapMany(Flux::fromIterable)
             .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message