james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [james-project] 08/12: JAMES-2630 Ensure Iterable flux get published on ElasticScheduler before rate limiting
Date Mon, 18 Feb 2019 08:40:36 GMT
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c5932ce62f601dc9e940972ba091404b1acfa850
Author: Benoit Tellier <btellier@linagora.com>
AuthorDate: Thu Feb 14 16:13:12 2019 +0700

    JAMES-2630 Ensure Iterable flux get published on ElasticScheduler before rate limiting
---
 .../org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java   | 2 ++
 .../apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java  | 3 +++
 2 files changed, 5 insertions(+)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 29e9738..ae37a96 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -91,6 +91,7 @@ import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 
 public class CassandraMessageDAO {
@@ -232,6 +233,7 @@ public class CassandraMessageDAO {
 
     public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData>
messageIds, FetchType fetchType, Limit limit) {
         return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
+            .publishOn(Schedulers.elastic())
             .limitRate(configuration.getMessageReadChunkSize())
             .flatMap(id -> retrieveRow(id, fetchType)
                 .flatMap(resultSet -> message(resultSet, id, fetchType)));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index fb0eb4f..f74b2ca 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -56,6 +56,7 @@ import com.google.common.collect.Multimap;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
@@ -91,6 +92,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     @Override
     public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType
fetchType) {
         return Flux.fromStream(messageIds.stream())
+            .publishOn(Schedulers.elastic())
             .limitRate(cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId,
Optional.empty()))
             .collectList()
@@ -178,6 +180,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         Flux.fromIterable(ids.asMap()
             .entrySet())
             .limitRate(cassandraConfiguration.getExpungeChunkSize())
+            .publishOn(Schedulers.elastic())
             .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()))
             .then()
             .block();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message