james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [james-project] 05/12: JAMES-2630 Use Reactor to enforce backend rate limits
Date Mon, 18 Feb 2019 08:40:33 GMT
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bc958e7138bab21ffd356294b3a81f63699c71ec
Author: Benoit Tellier <btellier@linagora.com>
AuthorDate: Wed Feb 13 10:54:28 2019 +0700

    JAMES-2630 Use Reactor to enforce backend rate limits
    
    Previous code used to perform computation by chunks.
    
    This had the following inconvenience:
     - It makes the code hard to read as we need to handle a collection of entity and not
an entity
     - This is sub-optimal as the chunck needs to be fully processed before the next chunk
computation begins. With reactor, this is better as a constant number of entity are continuously
processed.
---
 .../mailbox/cassandra/mail/AttachmentLoader.java   |   5 +-
 .../cassandra/mail/CassandraMessageDAO.java        |  11 +-
 .../cassandra/mail/CassandraMessageIdMapper.java   |  33 +++---
 .../cassandra/mail/CassandraMessageMapper.java     |   3 +-
 .../mail/CassandraAttachmentOwnerDAOTest.java      |  14 +--
 .../apache/james/util/streams/JamesCollectors.java |  80 --------------
 .../james/util/streams/JamesCollectorsTest.java    | 115 ---------------------
 7 files changed, 23 insertions(+), 238 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
index b3eef40..ab281d0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
@@ -25,11 +25,12 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.store.mail.MessageMapper;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -41,7 +42,7 @@ public class AttachmentLoader {
         this.attachmentMapper = attachmentMapper;
     }
 
-    public Mono<SimpleMailboxMessage> addAttachmentToMessage(Pair<MessageWithoutAttachment,
Stream<MessageAttachmentRepresentation>> messageRepresentation, MessageMapper.FetchType
fetchType) {
+    public Mono<MailboxMessage> addAttachmentToMessage(Pair<MessageWithoutAttachment,
Stream<MessageAttachmentRepresentation>> messageRepresentation, MessageMapper.FetchType
fetchType) {
         return loadAttachments(messageRepresentation.getRight(), fetchType)
             .map(attachments -> messageRepresentation.getLeft().toMailboxMessage(attachments));
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 03ec22c..29e9738 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -40,7 +40,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.T
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TEXTUAL_LINE_COUNT;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -73,7 +72,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
-import org.apache.james.util.streams.JamesCollectors;
 import org.apache.james.util.streams.Limit;
 
 import com.datastax.driver.core.BoundStatement;
@@ -233,13 +231,8 @@ public class CassandraMessageDAO {
     }
 
     public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData>
messageIds, FetchType fetchType, Limit limit) {
-        return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())
-            .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())))
-            .flatMap(ids -> rowToMessages(fetchType, ids));
-    }
-
-    private Flux<MessageResult> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData>
ids) {
-        return Flux.fromIterable(ids)
+        return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
+            .limitRate(configuration.getMessageReadChunkSize())
             .flatMap(id -> retrieveRow(id, fetchType)
                 .flatMap(resultSet -> message(resultSet, id, fetchType)));
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index bfc3afb..fb0eb4f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -46,16 +45,15 @@ import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.JamesCollectors;
 import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Multimap;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -92,27 +90,20 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     @Override
     public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType
fetchType) {
-        return messageIds.stream()
-            .collect(JamesCollectors.chunker(cassandraConfiguration.getMessageReadChunkSize()))
-            .flatMap(chuckedIds -> findAsStream(messageIds, fetchType))
-            .collect(Guavate.toImmutableList());
-    }
-
-    private Stream<SimpleMailboxMessage> findAsStream(Collection<MessageId> messageIds,
FetchType fetchType) {
         return Flux.fromStream(messageIds.stream())
-                .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId,
Optional.empty()))
-                .collectList()
-                .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds,
fetchType, Limit.unlimited()))
-                .filter(CassandraMessageDAO.MessageResult::isFound)
-                .map(CassandraMessageDAO.MessageResult::message)
-                .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation,
fetchType))
-                .flatMap(this::keepMessageIfMailboxExists)
-                .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
-                .block()
-                .stream();
+            .limitRate(cassandraConfiguration.getMessageReadChunkSize())
+            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId,
Optional.empty()))
+            .collectList()
+            .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds,
fetchType, Limit.unlimited()))
+            .filter(CassandraMessageDAO.MessageResult::isFound)
+            .map(CassandraMessageDAO.MessageResult::message)
+            .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation,
fetchType))
+            .flatMap(this::keepMessageIfMailboxExists)
+            .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
+            .block();
     }
 
-    private Mono<SimpleMailboxMessage> keepMessageIfMailboxExists(SimpleMailboxMessage
message) {
+    private Mono<MailboxMessage> keepMessageIfMailboxExists(MailboxMessage message)
{
         CassandraId cassandraId = (CassandraId) message.getMailboxId();
         return mailboxDAO.retrieveMailbox(cassandraId)
             .map(any -> message)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ba66061..908e739 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -174,7 +175,7 @@ public class CassandraMessageMapper implements MessageMapper {
             .block();
     }
 
-    private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData>
messageIds, FetchType fetchType, Limit limit) {
+    private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData>
messageIds, FetchType fetchType, Limit limit) {
         return messageDAO.retrieveMessages(messageIds, fetchType, limit)
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
index bf325af..7bf201c 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
@@ -21,14 +21,11 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.stream.IntStream;
-
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.store.mail.model.Username;
-import org.apache.james.util.streams.JamesCollectors;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -78,13 +75,10 @@ class CassandraAttachmentOwnerDAOTest {
     void retrieveOwnersShouldNotThrowWhenMoreReferencesThanPaging() {
         int referenceCountExceedingPaging = 5050;
 
-        IntStream.range(0, referenceCountExceedingPaging)
-            .boxed()
-            .collect(JamesCollectors.chunker(128))
-            .forEach(chunk -> Flux.fromIterable(chunk)
-                    .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner"
+ i)))
-                    .then()
-                    .block());
+        Flux.range(0, referenceCountExceedingPaging)
+            .limitRate(128)
+            .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner"
+ i)))
+            .blockLast();
 
         assertThat(testee.retrieveOwners(ATTACHMENT_ID).toIterable())
             .hasSize(referenceCountExceedingPaging);
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java
b/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java
deleted file mode 100644
index e705063..0000000
--- a/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.util.streams;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
-import java.util.function.BinaryOperator;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collector;
-import java.util.stream.Stream;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-
-public class JamesCollectors {
-    public static <D> Collector<D, ?, Stream<Collection<D>>> chunker(int
chunkSize) {
-        return new ChunkCollector<>(chunkSize);
-    }
-
-    public static class ChunkCollector<D> implements Collector<D, Multimap<Integer,
D>, Stream<Collection<D>>> {
-        private final int chunkSize;
-        private final AtomicInteger counter;
-
-        private ChunkCollector(int chunkSize) {
-            Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be strictly positive");
-            this.chunkSize = chunkSize;
-            this.counter = new AtomicInteger(-1);
-        }
-
-        @Override
-        public Supplier<Multimap<Integer, D>> supplier() {
-            return ArrayListMultimap::create;
-        }
-
-        @Override
-        public BiConsumer<Multimap<Integer, D>, D> accumulator() {
-            return (accumulator, value) -> accumulator.put(counter.incrementAndGet() /
chunkSize, value);
-        }
-
-        @Override
-        public BinaryOperator<Multimap<Integer, D>> combiner() {
-            return (accumulator1, accumulator2) -> {
-                accumulator1.putAll(accumulator2);
-                return accumulator1;
-            };
-        }
-
-        @Override
-        public Function<Multimap<Integer, D>, Stream<Collection<D>>>
finisher() {
-            return accumulator -> accumulator.asMap().values().stream();
-        }
-
-        @Override
-        public Set<Characteristics> characteristics() {
-            return ImmutableSet.of();
-        }
-    }
-}
diff --git a/server/container/util/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
b/server/container/util/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
deleted file mode 100644
index 8cca6bf..0000000
--- a/server/container/util/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.util.streams;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-
-import java.util.List;
-import java.util.stream.Stream;
-
-import org.junit.jupiter.api.Test;
-
-import com.github.steveash.guavate.Guavate;
-import com.google.common.collect.ImmutableList;
-
-public class JamesCollectorsTest {
-
-    @Test
-    void chunkerShouldAcceptEmptyStrem() {
-        Stream<Integer> emptyStream = Stream.of();
-
-        assertThat(emptyStream.collect(JamesCollectors.chunker(10))
-            .collect(Guavate.toImmutableList()))
-            .isEmpty();
-    }
-
-    @Test
-    void chunkerShouldThrowOnZeroChunkSize() {
-        assertThatIllegalArgumentException()
-            .isThrownBy(() -> JamesCollectors.chunker(0));
-    }
-
-    @Test
-    void chunkerShouldThrowOnNegativeChunkSize() {
-        assertThatIllegalArgumentException()
-            .isThrownBy(() -> JamesCollectors.chunker(-1));
-    }
-
-    @Test
-    void chunkerShouldChunkMonoValueStreams() {
-        Stream<Integer> monoValueStream = Stream.of(1);
-
-        List<List<Integer>> values = monoValueStream.collect(JamesCollectors.chunker(10))
-            .map(ImmutableList::copyOf)
-            .collect(Guavate.toImmutableList());
-        assertThat(values)
-            .isEqualTo(ImmutableList.of(ImmutableList.of(1)));
-    }
-
-    @Test
-    void chunkerShouldChunkStreamsSmallerThanChunkSize() {
-        Stream<Integer> stream = Stream.of(1, 2);
-
-        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .map(ImmutableList::copyOf)
-            .collect(Guavate.toImmutableList());
-        assertThat(values)
-            .isEqualTo(ImmutableList.of(ImmutableList.of(1, 2)));
-    }
-
-    @Test
-    void chunkerShouldChunkStreamsAsBigAsChunkSize() {
-        Stream<Integer> stream = Stream.of(1, 2, 3);
-
-        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .map(ImmutableList::copyOf)
-            .collect(Guavate.toImmutableList());
-        assertThat(values)
-            .isEqualTo(ImmutableList.of(ImmutableList.of(1, 2, 3)));
-    }
-
-    @Test
-    void chunkerShouldChunkStreamsBiggerThanChunkSize() {
-        Stream<Integer> stream = Stream.of(1, 2, 3, 4);
-
-        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .map(ImmutableList::copyOf)
-            .collect(Guavate.toImmutableList());
-        assertThat(values)
-            .isEqualTo(ImmutableList.of(
-                ImmutableList.of(1, 2, 3),
-                ImmutableList.of(4)));
-    }
-
-    @Test
-    void chunkerShouldChunkInSeveralBuckets() {
-        Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7);
-
-        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
-            .map(ImmutableList::copyOf)
-            .collect(Guavate.toImmutableList());
-        assertThat(values)
-            .isEqualTo(ImmutableList.of(
-                ImmutableList.of(1, 2, 3),
-                ImmutableList.of(4, 5, 6),
-                ImmutableList.of(7)));
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message