james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adup...@apache.org
Subject [3/7] james-project git commit: JAMES-1948 chain method from CompletableFutureUtile
Date Thu, 02 Mar 2017 13:44:45 GMT
JAMES-1948 chain method from CompletableFutureUtile


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/4afa12fe
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/4afa12fe
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/4afa12fe

Branch: refs/heads/master
Commit: 4afa12fe396fc794d19dec03e226d52e3cc02ed8
Parents: 8dba042
Author: Luc DUZAN <lduzan@linagora.com>
Authored: Mon Feb 27 16:38:13 2017 +0100
Committer: Luc DUZAN <lduzan@linagora.com>
Committed: Thu Mar 2 12:31:25 2017 +0100

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 55 ++++++++--------
 .../apache/james/util/FluentFutureStream.java   | 69 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/4afa12fe/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 11161e5..6dadab6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -54,11 +54,11 @@ import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
@@ -177,19 +177,24 @@ public class CassandraMessageMapper implements MessageMapper {
         CompletableFuture<Stream<Pair<CassandraMessageDAO.MessageWithoutAttachment,
Stream<CassandraMessageDAO.MessageAttachmentRepresentation>>>>
             messageRepresentations = messageDAO.retrieveMessages(messageIds, fetchType, limit);
         if (fetchType == FetchType.Body || fetchType == FetchType.Full) {
-            return messageRepresentations
-                .thenCompose(stream -> CompletableFutureUtil.allOf(
-                    stream.map(pair -> attachmentLoader.getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
-                        .thenApply(attachments -> Pair.of(pair.getLeft(), attachments)))
-                ))
-                .thenApply(stream -> stream.map(Throwing.function(pair -> pair.getLeft()
-                    .toMailboxMessage(pair.getRight()
-                        .stream()
-                        .collect(Guavate.toImmutableList())))));
+            return FluentFutureStream.of(messageRepresentations)
+                .thenComposeOnAll(pair ->
+                    attachmentLoader.getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
+                        .thenApply(attachments -> Pair.of(pair.getLeft(), attachments))
+                )
+                .map(pair ->
+                    pair.getLeft()
+                        .toMailboxMessage(pair.getRight()
+                            .stream()
+                            .collect(Guavate.toImmutableList())))
+                .completableFuture();
         } else {
-            return messageRepresentations.thenApply(
-                stream -> stream.map(pair ->
-                pair.getLeft().toMailboxMessage(ImmutableList.of())));
+            return FluentFutureStream.of(messageRepresentations)
+                .map(pair ->
+                    pair
+                        .getLeft()
+                        .toMailboxMessage(ImmutableList.of()))
+                .completableFuture();
         }
     }
 
@@ -208,27 +213,25 @@ public class CassandraMessageMapper implements MessageMapper {
             .orElse(null);
     }
 
-    @Override
     public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox
mailbox, MessageRange messageRange) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        CompletableFuture<Stream<SimpleMailboxMessage>> messagesToDelete =
-            CompletableFutureUtil.flatMapOnAll(
-                deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange),
+        return FluentFutureStream.of(deletedMessageDAO.retrieveDeletedMessage(mailboxId,
messageRange))
+            .thenComposeOnAll(
                 messageId ->
                     messageIdDAO.retrieve(mailboxId, messageId))
-
-            .thenApply(ids -> ids
-                .flatMap(OptionalConverter::toStream)
-                .collect(Guavate.toImmutableList()))
-
-            .thenCompose(ids -> retrieveMessages(ids, FetchType.Metadata, Optional.empty()));
-
-        return CompletableFutureUtil.performOnAll(messagesToDelete, message ->
-                deleteAsFuture(message, mailboxId))
+            .flatMap(OptionalConverter::toStream)
+            .thenCompose(ids ->
+                retrieveMessages(
+                    ids.collect(Guavate.toImmutableList()),
+                    FetchType.Metadata,
+                    Optional.empty())
+            )
+            .performOnAll(message -> deleteAsFuture(message, mailboxId))
             .join()
             .collect(Guavate.toImmutableMap(MailboxMessage::getUid, SimpleMessageMetaData::new));
     }
+
     @Override
     public MessageMetaData move(Mailbox destinationMailbox, MailboxMessage original) throws
MailboxException {
         CassandraId originalMailboxId = (CassandraId) original.getMailboxId();

http://git-wip-us.apache.org/repos/asf/james-project/blob/4afa12fe/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
new file mode 100644
index 0000000..85e6f03
--- /dev/null
+++ b/server/container/util-java8/src/main/java/org/apache/james/util/FluentFutureStream.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+public class FluentFutureStream<T> {
+
+    private final CompletableFuture<Stream<T>> completableFuture;
+
+    public static <T> FluentFutureStream<T> of(CompletableFuture<Stream<T>>
completableFuture) {
+        return new FluentFutureStream<>(completableFuture);
+    }
+
+    private FluentFutureStream(CompletableFuture<Stream<T>> completableFuture)
{
+        this.completableFuture = completableFuture;
+    }
+
+    public FluentFutureStream<T> performOnAll(Function<T, CompletableFuture<Void>>
action) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.performOnAll(completableFuture(), action));
+    }
+
+    public <U> FluentFutureStream<U> map(Function<T, U> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.map(completableFuture(), function));
+    }
+
+    public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, CompletableFuture<U>>
function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.thenComposeOnAll(completableFuture(), function));
+    }
+
+    public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>>
function) {
+        return FluentFutureStream.of(completableFuture().thenApply(stream ->
+            stream.flatMap(function)));
+    }
+
+    public <U> FluentFutureStream<U> thenCompose(Function<Stream<T>,
CompletableFuture<Stream<U>>> function) {
+        return FluentFutureStream.of(completableFuture().thenCompose(function));
+    }
+
+    public CompletableFuture<Stream<T>> completableFuture() {
+        return this.completableFuture;
+    }
+
+    public Stream<T> join() {
+        return completableFuture().join();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message