james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [11/12] james-project git commit: JAMES-2525 asynchronous ObjectStorageBlobsDAO
Date Tue, 27 Nov 2018 02:06:38 GMT
JAMES-2525 asynchronous ObjectStorageBlobsDAO


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/30c514a1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/30c514a1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/30c514a1

Branch: refs/heads/master
Commit: 30c514a1d427a08208c5f7e1feef43aa7fd5b06e
Parents: 4f58df6
Author: tran tien duc <dtran@linagora.com>
Authored: Mon Nov 26 19:40:46 2018 +0700
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Tue Nov 27 09:05:10 2018 +0700

----------------------------------------------------------------------
 .../objectstorage/ObjectStorageBlobsDAO.java    | 24 ++++++++---------
 .../ObjectStorageBlobsDAOTest.java              | 27 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/30c514a1/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index ea71d10..b83d2c4 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -96,26 +96,27 @@ public class ObjectStorageBlobsDAO implements BlobStore {
         Preconditions.checkNotNull(data);
 
         BlobId tmpId = blobIdFactory.randomId();
-        BlobId id = save(data, tmpId);
-        updateBlobId(tmpId, id);
-
-        return CompletableFuture.completedFuture(id);
+        return save(data, tmpId)
+            .thenCompose(id -> updateBlobId(tmpId, id));
     }
 
-    private void updateBlobId(BlobId from, BlobId to) {
+    private CompletableFuture<BlobId> updateBlobId(BlobId from, BlobId to) {
         String containerName = this.containerName.value();
-        blobStore.copyBlob(containerName, from.asString(), containerName, to.asString(),
-            CopyOptions.NONE);
-        blobStore.removeBlob(containerName, from.asString());
+        return CompletableFuture
+            .supplyAsync(() -> blobStore.copyBlob(containerName, from.asString(), containerName,
to.asString(), CopyOptions.NONE))
+            .thenAcceptAsync(any -> blobStore.removeBlob(containerName, from.asString()))
+            .thenApply(any -> to);
     }
 
-    private BlobId save(InputStream data, BlobId id) {
+    private CompletableFuture<BlobId> save(InputStream data, BlobId id) {
         String containerName = this.containerName.value();
         HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(),
data);
         Payload payload = payloadCodec.write(hashingInputStream);
         Blob blob = blobStore.blobBuilder(id.asString()).payload(payload).build();
-        blobStore.putBlob(containerName, blob);
-        return blobIdFactory.from(hashingInputStream.hash().toString());
+
+        return CompletableFuture
+            .supplyAsync(() -> blobStore.putBlob(containerName, blob))
+            .thenApply(any -> blobIdFactory.from(hashingInputStream.hash().toString()));
     }
 
     @Override
@@ -147,4 +148,3 @@ public class ObjectStorageBlobsDAO implements BlobStore {
         blobStore.deleteContainer(containerName.value());
     }
 }
-

http://git-wip-us.apache.org/repos/asf/james-project/blob/30c514a1/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
index 265bc9f..78c6a8a 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
@@ -26,6 +26,7 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
@@ -46,8 +47,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import com.google.common.base.Strings;
+
 @ExtendWith(DockerSwiftExtension.class)
 public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
+    private static final String BIG_STRING = Strings.repeat("big blob content", 10 * 1024);
     private static final TenantName TENANT_NAME = TenantName.of("test");
     private static final UserName USER_NAME = UserName.of("tester");
     private static final Credentials PASSWORD = Credentials.of("testing");
@@ -158,5 +162,28 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract
{
         assertThat(blobStore.containerExists(containerName.value()))
             .isFalse();
     }
+
+    @Test
+    void saveBytesShouldNotCompleteWhenDoesNotAwait() {
+        // String need to be big enough to get async thread busy hence could not return result
instantly
+        CompletableFuture<BlobId> blobIdFuture = testee.save(BIG_STRING.getBytes(StandardCharsets.UTF_8));
+        assertThat(blobIdFuture)
+            .isNotCompleted();
+    }
+
+    @Test
+    void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
+        CompletableFuture<BlobId> blobIdFuture = testee.save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)));
+        assertThat(blobIdFuture)
+            .isNotCompleted();
+    }
+
+    @Test
+    void readBytesShouldNotCompleteWhenDoesNotAwait() {
+        BlobId blobId = testee().save(BIG_STRING.getBytes(StandardCharsets.UTF_8)).join();
+        CompletableFuture<byte[]> resultFuture = testee.readBytes(blobId);
+        assertThat(resultFuture)
+            .isNotCompleted();
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message