james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adup...@apache.org
Subject [1/8] james-project git commit: MAILBOX-285 Cassandra MessageMapper should support concurrent flags updates
Date Wed, 18 Jan 2017 07:51:55 GMT
Repository: james-project
Updated Branches:
  refs/heads/master 8256ad1b3 -> 14be7a1d5


MAILBOX-285 Cassandra MessageMapper should support concurrent flags updates


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/16bba839
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/16bba839
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/16bba839

Branch: refs/heads/master
Commit: 16bba839c2f104098c5db000416dfa282c0209df
Parents: f8eb73e
Author: Benoit Tellier <btellier@linagora.com>
Authored: Wed Jan 11 18:28:48 2017 +0700
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Tue Jan 17 10:44:40 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 17 +++---
 .../store/mail/model/MessageMapperTest.java     | 57 ++++++++++++++++++++
 2 files changed, 66 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/16bba839/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index e6f1ed7..51f2528 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -42,6 +42,7 @@ import org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUp
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.UpdatedFlags;
@@ -335,11 +336,7 @@ public class CassandraMessageMapper implements MessageMapper {
             return Optional.of(
                 new FunctionRunnerWithRetry(maxRetries)
                     .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox,
-                            ComposedMessageIdWithMetaData.builder()
-                                .composedMessageId(new ComposedMessageId(mailbox.getMailboxId(),
message.getMessageId(), message.getUid()))
-                                .modSeq(message.getModSeq())
-                                .flags(message.createFlags())
-                            .build(),
+                            message.getMessageId(),
                             flagUpdateCalculator)));
         } catch (MessageDeletedDuringFlagsUpdateException e) {
             mailboxSession.getLog().warn(e.getMessage());
@@ -349,13 +346,17 @@ public class CassandraMessageMapper implements MessageMapper {
         }
     }
 
-    private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, ComposedMessageIdWithMetaData
composedMessageIdWithMetaData, FlagsUpdateCalculator flagUpdateCalculator) {
-        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
+    private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, MessageId
messageId, FlagsUpdateCalculator flagUpdateCalculator) {
+        CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = imapUidDAO.retrieve((CassandraMessageId)
messageId, Optional.of(cassandraId))
+            .join()
+            .findFirst()
+            .orElseThrow(MailboxDeleteDuringUpdate::new);
         return tryMessageFlagsUpdate(flagUpdateCalculator,
                 mailbox,
                 messageDAO.retrieveMessages(ImmutableList.of(composedMessageIdWithMetaData),
FetchType.Metadata, Optional.empty()).join()
                     .findFirst()
                     .map(pair -> pair.getLeft().toMailboxMessage(ImmutableList.of()))
-                    .orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(mailboxId,
(CassandraMessageId) composedMessageIdWithMetaData.getComposedMessageId().getMessageId())));
+                    .orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(cassandraId,
(CassandraMessageId) messageId)));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/16bba839/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index ec87df8..a4e6053 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -28,6 +28,10 @@ import java.io.IOException;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.mail.Flags;
 import javax.mail.util.SharedByteArrayInputStream;
@@ -62,6 +66,37 @@ import com.google.common.collect.Lists;
 @Contract(MapperProvider.class)
 public class MessageMapperTest<T extends MapperProvider> {
 
+    private class ConcurrentSetFlagTestRunnable implements Runnable {
+        private final int threadNumber;
+        private final int updateCount;
+        private final Mailbox mailbox;
+        private final MessageUid uid;
+        private final CountDownLatch countDownLatch;
+
+        public ConcurrentSetFlagTestRunnable(int threadNumber, int updateCount, Mailbox mailbox,
MessageUid uid, CountDownLatch countDownLatch) {
+            this.threadNumber = threadNumber;
+            this.updateCount = updateCount;
+            this.mailbox = mailbox;
+            this.uid = uid;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @Override
+        public void run() {
+            countDownLatch.countDown();
+            for (int i = 0; i < updateCount; i++) {
+                try {
+                    messageMapper.updateFlags(mailbox,
+                        new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-"
+ i),
+                            FlagsUpdateMode.ADD),
+                        MessageRange.one(uid));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
     private final static char DELIMITER = '.';
     private static final int LIMIT = 10;
     private static final int BODY_START = 16;
@@ -695,6 +730,28 @@ public class MessageMapperTest<T extends MapperProvider> {
     }
 
     @ContractTest
+    public void userFlagsUpdateShouldWorkInConcurrentEnvironment() throws Exception {
+        saveMessages();
+
+        int threadCount = 2;
+        int updateCount = 10;
+        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
+        for (int i = 0; i < threadCount; i++) {
+            executorService.submit(new ConcurrentSetFlagTestRunnable(i, updateCount,
+                benwaInboxMailbox, message1.getUid(), countDownLatch));
+        }
+        executorService.shutdown();
+        assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES))
+            .isTrue();
+
+        Iterator<MailboxMessage> messages = messageMapper.findInMailbox(benwaInboxMailbox,
MessageRange.one(message1.getUid()),
+            FetchType.Metadata, 1);
+        assertThat(messages.hasNext()).isTrue();
+        assertThat(messages.next().createFlags().getUserFlags()).hasSize(threadCount * updateCount);
+    }
+
+    @ContractTest
     public void messagesShouldBeSavedWithTheirUserFlags() throws Exception {
         MailboxMessage message = SimpleMailboxMessage.copy(benwaInboxMailbox.getMailboxId(),
message1);
         messageMapper.add(benwaInboxMailbox, message);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message