james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [04/19] james-project git commit: JAMES-2541 Concurrent version of Store<T>
Date Mon, 10 Sep 2018 10:34:07 GMT
JAMES-2541 Concurrent version of Store<T>

One will not do what he wants with concurrency when on top of a fixed thread pool... Previous
version of the code dead-locked.

Using byte as an intermediate representation I can interact with the object store in an async
fashion (and avoid the dead lock)


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b4458677
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b4458677
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b4458677

Branch: refs/heads/master
Commit: b44586778a54cd152d3c76fa70c3d3cb1ca2bb44
Parents: 6a29366
Author: Benoit Tellier <btellier@linagora.com>
Authored: Thu Sep 6 17:31:25 2018 +0700
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Mon Sep 10 17:17:41 2018 +0700

----------------------------------------------------------------------
 .../java/org/apache/james/blob/api/Store.java   | 22 ++++++++------
 .../james/blob/mail/MimeMessageStore.java       |  6 ++--
 .../james/blob/mail/MimeMessageStoreTest.java   | 10 +++---
 .../cassandra/CassandraMailRepository.java      | 32 +++++++++++---------
 4 files changed, 37 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 4665c6b..47c4988 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -62,12 +62,12 @@ public interface Store<T> {
     }
 
     interface Decoder<T> {
-        T decode(Map<BlobType, InputStream> streams);
+        T decode(Map<BlobType, byte[]> streams);
     }
 
-    Map<BlobType, BlobId> save(T t);
+    CompletableFuture<Map<BlobType, BlobId>> save(T t);
 
-    T read(Map<BlobType, BlobId> blobIds);
+    CompletableFuture<T> read(Map<BlobType, BlobId> blobIds);
 
     class Impl<T> implements Store<T> {
         private final Encoder<T> encoder;
@@ -81,14 +81,14 @@ public interface Store<T> {
         }
 
         @Override
-        public Map<BlobType, BlobId> save(T t) {
+        public CompletableFuture<Map<BlobType, BlobId>> save(T t) {
             return FluentFutureStream.of(
                 encoder.encode(t)
                     .entrySet()
                     .stream()
                     .map(this::saveEntry))
-                .join()
-                .collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue));
+                .completableFuture()
+                .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey,
Pair::getValue)));
         }
 
         private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Map.Entry<BlobType,
InputStream> entry) {
@@ -97,12 +97,14 @@ public interface Store<T> {
         }
 
         @Override
-        public T read(Map<BlobType, BlobId> blobIds) {
-            ImmutableMap<BlobType, InputStream> binaries = blobIds.entrySet()
+        public CompletableFuture<T> read(Map<BlobType, BlobId> blobIds) {
+            CompletableFuture<ImmutableMap<BlobType, byte[]>> binaries = FluentFutureStream.of(blobIds.entrySet()
                 .stream()
-                .map(entry -> Pair.of(entry.getKey(), blobStore.read(entry.getValue())))
+                .map(entry -> blobStore.readBytes(entry.getValue())
+                    .thenApply(bytes -> Pair.of(entry.getKey(), bytes))))
                 .collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue));
-            return decoder.decode(binaries);
+
+            return binaries.thenApply(decoder::decode);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
----------------------------------------------------------------------
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index 75cd973..5c2e444 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -106,15 +106,15 @@ public class MimeMessageStore extends Store.Impl<MimeMessage>
{
 
     static class MailDecoder implements Decoder<MimeMessage> {
         @Override
-        public MimeMessage decode(Map<BlobType, InputStream> streams) {
+        public MimeMessage decode(Map<BlobType, byte[]> streams) {
             Preconditions.checkNotNull(streams);
             Preconditions.checkArgument(streams.containsKey(HEADER_BLOB_TYPE));
             Preconditions.checkArgument(streams.containsKey(BODY_BLOB_TYPE));
 
             return toMimeMessage(
                 new SequenceInputStream(
-                    streams.get(HEADER_BLOB_TYPE),
-                    streams.get(BODY_BLOB_TYPE)));
+                    new ByteArrayInputStream(streams.get(HEADER_BLOB_TYPE)),
+                    new ByteArrayInputStream(streams.get(BODY_BLOB_TYPE))));
         }
 
         private MimeMessage toMimeMessage(InputStream inputStream) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
index 3bc3440..5a896ea 100644
--- a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
+++ b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
@@ -70,9 +70,9 @@ class MimeMessageStoreTest {
             .setText("Important mail content")
             .build();
 
-        Map<Store.BlobType, BlobId> parts = testee.save(message);
+        Map<Store.BlobType, BlobId> parts = testee.save(message).join();
 
-        MimeMessage retrievedMessage = testee.read(parts);
+        MimeMessage retrievedMessage = testee.read(parts).join();
 
         assertThat(MimeMessageUtil.asString(retrievedMessage))
             .isEqualTo(MimeMessageUtil.asString(message));
@@ -86,9 +86,9 @@ class MimeMessageStoreTest {
             .setSubject("Important Mail")
             .build();
 
-        Map<Store.BlobType, BlobId> parts = testee.save(message);
+        Map<Store.BlobType, BlobId> parts = testee.save(message).join();
 
-        MimeMessage retrievedMessage = testee.read(parts);
+        MimeMessage retrievedMessage = testee.read(parts).join();
 
         assertThat(MimeMessageUtil.asString(retrievedMessage))
             .isEqualTo(MimeMessageUtil.asString(message));
@@ -105,7 +105,7 @@ class MimeMessageStoreTest {
             .setText("Important mail content")
             .build();
 
-        Map<Store.BlobType, BlobId> parts = testee.save(message);
+        Map<Store.BlobType, BlobId> parts = testee.save(message).join();
 
         SoftAssertions.assertSoftly(
             softly -> {

http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index d2c8735..d0b4cbe 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
 import javax.mail.MessagingException;
-import javax.mail.internet.MimeMessage;
 
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
@@ -34,9 +33,11 @@ import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableMap;
 
 public class CassandraMailRepository implements MailRepository {
@@ -61,10 +62,10 @@ public class CassandraMailRepository implements MailRepository {
     public MailKey store(Mail mail) throws MessagingException {
         MailKey mailKey = MailKey.forMail(mail);
 
-
-        Map<Store.BlobType, BlobId> parts = mimeMessageStore.save(mail.getMessage());
-
-        mailDAO.store(url, mail, parts.get(MimeMessageStore.HEADER_BLOB_TYPE), parts.get(MimeMessageStore.BODY_BLOB_TYPE))
+        mimeMessageStore.save(mail.getMessage())
+            .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail,
+                parts.get(MimeMessageStore.HEADER_BLOB_TYPE),
+                parts.get(MimeMessageStore.BODY_BLOB_TYPE))))
             .thenCompose(any -> keysDAO.store(url, mailKey))
             .thenCompose(this::increaseSizeIfStored)
             .join();
@@ -88,21 +89,22 @@ public class CassandraMailRepository implements MailRepository {
 
     @Override
     public Mail retrieve(MailKey key) {
-        return mailDAO.read(url, key)
-                .thenApply(optional -> optional.map(this::toMail))
+        return CompletableFutureUtil
+            .unwrap(mailDAO.read(url, key)
+                .thenApply(optional -> optional.map(this::toMail)))
             .join()
             .orElse(null);
     }
 
-    private Mail toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
-        MimeMessage mimeMessage = mimeMessageStore
-            .read(ImmutableMap.of(
-                MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(),
-                MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId()));
+    private CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO)
{
+        Map<Store.BlobType, BlobId> parts = ImmutableMap.of(
+            MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(),
+            MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId());
 
-        return mailDTO.getMailBuilder()
-            .mimeMessage(mimeMessage)
-            .build();
+        return mimeMessageStore.read(parts)
+            .thenApply(mimeMessage -> mailDTO.getMailBuilder()
+                .mimeMessage(mimeMessage)
+                .build());
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message