james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adup...@apache.org
Subject [2/5] james-project git commit: JAMES-2583 completely fallback to secondary
Date Wed, 07 Nov 2018 13:13:05 GMT
JAMES-2583 completely fallback to secondary

when primary fails, completed exceptionally or returns empty


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/45624137
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/45624137
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/45624137

Branch: refs/heads/master
Commit: 45624137336239289014e9f441732ac28bb64893
Parents: aa064ef
Author: tran tien duc <dtran@linagora.com>
Authored: Mon Nov 5 13:28:50 2018 +0700
Committer: Antoine Duprat <aduprat@linagora.com>
Committed: Wed Nov 7 14:12:19 2018 +0100

----------------------------------------------------------------------
 .../james/blob/joining/JoiningBlobStore.java    | 36 ++++++++++-
 .../blob/joining/JoiningBlobStoreTest.java      | 66 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/45624137/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
index 4548007..aab4cdb 100644
--- a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
+++ b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
@@ -25,6 +25,7 @@ import java.io.PushbackInputStream;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
@@ -50,12 +51,26 @@ public class JoiningBlobStore implements BlobStore {
 
     @Override
     public CompletableFuture<BlobId> save(byte[] data) {
-        return primaryBlobStore.save(data);
+        try {
+            return saveToPrimaryFallbackIfFails(
+                primaryBlobStore.save(data),
+                () -> secondaryBlobStore.save(data));
+        } catch (Exception e) {
+            LOGGER.error("exception directly happens while saving bytes data, fall back to
secondary blob store", e);
+            return secondaryBlobStore.save(data);
+        }
     }
 
     @Override
     public CompletableFuture<BlobId> save(InputStream data) {
-        return primaryBlobStore.save(data);
+        try {
+            return saveToPrimaryFallbackIfFails(
+                primaryBlobStore.save(data),
+                () -> secondaryBlobStore.save(data));
+        } catch (Exception e) {
+            LOGGER.error("exception directly happens while saving InputStream data, fall
back to secondary blob store", e);
+            return secondaryBlobStore.save(data);
+        }
     }
 
     @Override
@@ -103,11 +118,28 @@ public class JoiningBlobStore implements BlobStore {
             .thenCompose(maybeBytes -> readFromSecondaryIfNeeded(maybeBytes, blobId));
     }
 
+    private CompletableFuture<BlobId> saveToPrimaryFallbackIfFails(
+        CompletableFuture<BlobId> primarySavingOperation,
+        Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier)
{
+
+        return primarySavingOperation
+            .thenApply(Optional::ofNullable)
+            .exceptionally(this::logAndReturnEmptyOptional)
+            .thenCompose(maybeBlobId -> saveToSecondaryIfNeeded(maybeBlobId, fallbackSavingOperationSupplier));
+    }
+
     private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) {
         LOGGER.error("primary completed exceptionally, fall back to second blob store", throwable);
         return Optional.empty();
     }
 
+    private CompletableFuture<BlobId> saveToSecondaryIfNeeded(Optional<BlobId>
maybeBlobId,
+                                                              Supplier<CompletableFuture<BlobId>>
saveToSecondarySupplier) {
+        return maybeBlobId
+            .map(CompletableFuture::completedFuture)
+            .orElseGet(saveToSecondarySupplier);
+    }
+
     private CompletableFuture<byte[]> readFromSecondaryIfNeeded(Optional<byte[]>
readFromPrimaryResult, BlobId blodId) {
         return readFromPrimaryResult
             .filter(this::hasContent)

http://git-wip-us.apache.org/repos/asf/james-project/blob/45624137/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
index a26dff5..329dd92 100644
--- a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
+++ b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.james.blob.api.BlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.memory.MemoryBlobStore;
 import org.apache.james.util.CompletableFutureUtil;
+import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
@@ -110,6 +111,71 @@ class JoiningBlobStoreTest implements BlobStoreContract {
     }
 
     @Nested
+    class PrimarySaveThrowsExceptionDirectly {
+
+        @Test
+        void saveShouldFallBackToSecondaryWhenPrimaryGotException() throws Exception {
+            MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(),
secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+
+        @Test
+        void saveInputStreamShouldFallBackToSecondaryWhenPrimaryGotException() throws Exception
{
+            MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(),
secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+    }
+
+    @Nested
+    class PrimarySaveCompletesExceptionally {
+
+        @Test
+        void saveShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception
{
+            MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(),
secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+
+        @Test
+        void saveInputStreamShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally()
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(),
secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+
+    }
+
+    @Nested
     class PrimaryReadThrowsExceptionDirectly {
 
         @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message