james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1664983 - in /james/mailbox/trunk/cassandra/src: main/java/org/apache/james/mailbox/cassandra/ main/java/org/apache/james/mailbox/cassandra/mail/ main/java/org/apache/james/mailbox/cassandra/table/ test/java/org/apache/james/mailbox/cassan...
Date Sun, 08 Mar 2015 12:09:42 GMT
Author: eric
Date: Sun Mar  8 12:09:42 2015
New Revision: 1664983

URL: http://svn.apache.org/r1664983
Log:
Data races are possible while updating flags in Cassandra, patch provided by Benoit Tellier
(MAILBOX-206)

Modified:
    james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
    james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
    james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
    james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
    james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java

Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
(original)
+++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
Sun Mar  8 12:09:42 2015
@@ -57,7 +57,7 @@ public class CassandraSession implements
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".messageCounter (" +
"mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".mailboxCounters (" +
"mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + "nextModSeq counter"
+ ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".message (" + "mailboxId
UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob,"
+ "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int," + "bodyOctets
int,"
-                + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob,"
+ "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean,"
+ "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean," + "PRIMARY KEY (mailboxId,
uid)" + ");");
+                + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob,"
+ "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean,"
+ "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean," + "flagVersion bigint,"
+ "PRIMARY KEY (mailboxId, uid)" + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".subscription (" + "user
text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");");
         session.close();
     }

Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
(original)
+++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
Sun Mar  8 12:09:42 2015
@@ -29,6 +29,7 @@ import static com.datastax.driver.core.q
 import static com.datastax.driver.core.querybuilder.QueryBuilder.lt;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_CONTENT;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_OCTECTS;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET;
@@ -51,6 +52,7 @@ import static org.apache.james.mailbox.c
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.RECENT;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.SEEN;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.USER;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FLAG_VERSION;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -64,6 +66,7 @@ import javax.mail.Flags;
 import javax.mail.Flags.Flag;
 import javax.mail.util.SharedByteArrayInputStream;
 
+import com.datastax.driver.core.querybuilder.Update;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageTable;
@@ -106,15 +109,25 @@ public class CassandraMessageMapper impl
     private MailboxSession mailboxSession;
     private UidProvider<UUID> uidProvider;
 
+    private final int applied = 0;
+    private int maxRetries;
+
+    private final static int DEFAULT_MAX_RETRIES = 10000;
+
     public CassandraMessageMapper(Session session, CassandraUidProvider uidProvider, ModSeqProvider<UUID>
modSeqProvider) {
+        this(session, uidProvider, modSeqProvider, null, DEFAULT_MAX_RETRIES);
+    }
+
+    public CassandraMessageMapper(Session session, CassandraUidProvider uidProvider, ModSeqProvider<UUID>
modSeqProvider, MailboxSession mailboxSession) {
+        this(session, uidProvider, modSeqProvider, mailboxSession, DEFAULT_MAX_RETRIES);
+    }
+
+    public CassandraMessageMapper(Session session, CassandraUidProvider uidProvider, ModSeqProvider<UUID>
modSeqProvider, MailboxSession mailboxSession, int maxRetries) {
         this.session = session;
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
-    }
-
-    public CassandraMessageMapper(Session session, CassandraUidProvider uidProvider, CassandraModSeqProvider
modSeqProvider, MailboxSession mailboxSession) {
-        this(session, uidProvider, modSeqProvider);
         this.mailboxSession = mailboxSession;
+        this.maxRetries = maxRetries;
     }
 
     @Override
@@ -311,7 +324,7 @@ public class CassandraMessageMapper impl
             Insert query = insertInto(TABLE_NAME).value(MAILBOX_ID, mailbox.getMailboxId()).value(IMAP_UID,
message.getUid()).value(MOD_SEQ, message.getModSeq()).value(INTERNAL_DATE, message.getInternalDate()).value(MEDIA_TYPE,
message.getMediaType())
                     .value(BODY_START_OCTET, message.getFullContentOctets() - message.getBodyOctets()).value(SUB_TYPE,
message.getSubType()).value(FULL_CONTENT_OCTETS, message.getFullContentOctets()).value(BODY_OCTECTS,
message.getBodyOctets()).value(ANSWERED, message.isAnswered())
                     .value(DELETED, message.isDeleted()).value(DRAFT, message.isDraft()).value(FLAGGED,
message.isFlagged()).value(RECENT, message.isRecent()).value(SEEN, message.isSeen()).value(USER,
message.createFlags().contains(Flag.USER)).value(BODY_CONTENT, bindMarker())
-                    .value(HEADER_CONTENT, bindMarker()).value(TEXTUAL_LINE_COUNT, message.getTextualLineCount());
+                    .value(HEADER_CONTENT, bindMarker()).value(TEXTUAL_LINE_COUNT, message.getTextualLineCount()).value(FLAG_VERSION,
0);
             PreparedStatement preparedStatement = session.prepare(query.toString());
             BoundStatement boundStatement = preparedStatement.bind(toByteBuffer(message.getBodyContent()),
toByteBuffer(message.getHeaderContent()));
             session.execute(boundStatement);
@@ -321,28 +334,106 @@ public class CassandraMessageMapper impl
         }
     }
 
+    private boolean conditionalSave(Mailbox<UUID> mailbox, Message<UUID> message,
long flagVersion) throws MailboxException {
+        ResultSet resultSet = session.execute(
+                update(TABLE_NAME)
+                        .with(set(ANSWERED, message.isAnswered()))
+                        .and(set(DELETED, message.isDeleted()))
+                        .and(set(DRAFT, message.isDraft()))
+                        .and(set(FLAGGED, message.isFlagged()))
+                        .and(set(RECENT, message.isRecent()))
+                        .and(set(SEEN, message.isSeen()))
+                        .and(set(USER, message.createFlags().contains(Flag.USER)))
+                        .and(set(FLAG_VERSION, flagVersion + 1))
+                        .where(eq(IMAP_UID, message.getUid()))
+                        .and(eq(MAILBOX_ID, message.getMailboxId()))
+                        .onlyIf(eq(FLAG_VERSION, flagVersion))
+        );
+        return resultSet.one().getBool(applied);
+    }
+
     private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
         return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
     }
 
+    /**
+     *
+     * @param mailbox Mailbox were messages are located
+     * @param flags Flags used for update
+     * @param value True if you want to set provided flags to true, false to set these flag
to false
+     * @param replace true if we want to replace current flags by those provided
+     * @param set Range of messages to update
+     * @return Updated flag information
+     * @throws MailboxException
+     */
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox<UUID> mailbox, Flags flags,
boolean value, boolean replace, MessageRange set) throws MailboxException {
         ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
         for (Row row : session.execute(buildQuery(mailbox, set))) {
-            Message<UUID> message = message(row);
-            Flags originFlags = message.createFlags();
-            Flags updatedFlags = buildFlags(message, flags, value, replace);
-            message.setFlags(updatedFlags);
-            message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
-            save(mailbox, message);
-            result.add(new UpdatedFlags(message.getUid(), message.getModSeq(), originFlags,
updatedFlags));
+            updateMessage(mailbox, flags, value, replace, result, row);
         }
         return result.build().iterator();
     }
 
+    private void updateMessage(Mailbox<UUID> mailbox, Flags flags, boolean value, boolean
replace, ImmutableList.Builder<UpdatedFlags> result, Row row) throws MailboxException
{
+        // Get the message and basic information about it
+        Message<UUID> message = message(row);
+        long flagVersion = row.getLong(FLAG_VERSION);
+        long uid = message.getUid();
+        // update flags
+        Flags originFlags = message.createFlags();
+        Flags updatedFlags = buildFlags(message, flags, value, replace);
+        message.setFlags(updatedFlags);
+        // Update the ModSeq
+        long previousModSeq = message.getModSeq();
+        long modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
+        message.setModSeq(modSeq);
+        // Try a first update
+        if(!conditionalSave(mailbox, message, flagVersion)) {
+            int tries = 0;
+            // It fails. Someone updated the flag before us.
+            do {
+                tries++;
+                // Retrieve the message from uid
+                Row newRow = findMessageByUid(mailbox, uid);
+                if(newRow == null) {
+                    // Someone deleted this result while we were doing other stuff
+                    // Skip it
+                    break;
+                }
+                message = message(newRow);
+                flagVersion = newRow.getLong(FLAG_VERSION);
+                // update flags
+                originFlags = message.createFlags();
+                updatedFlags = buildFlags(message, flags, value, replace);
+                message.setFlags(updatedFlags);
+                // Update ModSeq
+                if (previousModSeq != message.getModSeq()) {
+                    // Here someone updated the ModSeq, so we can not used the previously
generated value
+                    previousModSeq = message.getModSeq();
+                    modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
+                }
+                message.setModSeq(modSeq);
+                // and retry
+            } while (!conditionalSave(mailbox, message, flagVersion) && tries <
maxRetries);
+            if(tries == maxRetries) {
+                throw new MailboxException("Max retries reached when asking an update of
flags on message " + uid + " for mailbox " + mailbox.getMailboxId());
+            }
+        }
+        result.add(new UpdatedFlags(message.getUid(), message.getModSeq(), originFlags, updatedFlags));
+    }
+
+    private Row findMessageByUid(Mailbox<UUID> mailbox, long uid) {
+        ResultSet resultSet = session.execute(selectMessage(mailbox, uid));
+        if ( resultSet.isExhausted() ) {
+            return null;
+        }
+        return resultSet.one();
+    }
+
     private Flags buildFlags(Message<UUID> message, Flags flags, boolean value, boolean
replace) {
         if (replace) {
-            return message.createFlags();
+            return flags;
         } else {
             Flags updatedFlags = message.createFlags();
             if (value) {

Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
(original)
+++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
Sun Mar  8 12:09:42 2015
@@ -38,7 +38,8 @@ public interface CassandraMessageTable {
     String TEXTUAL_LINE_COUNT = "textualLineCount";
     String BODY_CONTENT = "bodyContent";
     String HEADER_CONTENT = "headerContent";
-    String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, MEDIA_TYPE,
SUB_TYPE, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED,
Flag.RECENT, Flag.SEEN, Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT };
+    String FLAG_VERSION = "flagVersion";
+    String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, MEDIA_TYPE,
SUB_TYPE, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED,
Flag.RECENT, Flag.SEEN, Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, FLAG_VERSION
};
 
     interface Flag {
         String ANSWERED = "flagAnswered";

Modified: james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
(original)
+++ james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
Sun Mar  8 12:09:42 2015
@@ -91,7 +91,7 @@ public final class CassandraClusterSingl
         } else if (tableName.equals("message")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() +
".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet
int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets
int,"
                     + "bodyOctets int," + "textualLineCount bigint," + "bodyContent blob,"
+ "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean,"
+ "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean,"
-                    + "PRIMARY KEY (mailboxId, uid)" + ");");
+                    + "flagVersion bigint,"+ "PRIMARY KEY (mailboxId, uid)" + ");");
         } else if (tableName.equals("subscription")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() +
".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");");
         } else {

Modified: james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
(original)
+++ james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
Sun Mar  8 12:09:42 2015
@@ -19,9 +19,12 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -32,8 +35,11 @@ import javax.mail.util.SharedByteArrayIn
 
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.CassandraClusterSingleton;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.mock.MockMailboxSession;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.Message;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
@@ -136,6 +142,69 @@ public class CassandraMessageMapperTest
         testAdd();
         testGetLastUid();
         testGetHighestModSeq();
+        testMessageUpdateReplace();
+        testMessageUpdateAddition();
+    }
+
+    /**
+     * Test message flag replacement
+     */
+    private void testMessageUpdateReplace() throws MailboxException {
+        LOG.info("message update : replace flags");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.ANSWERED);
+        flags.add(Flags.Flag.DRAFT);
+        messageMapper.updateFlags(MBOXES.get(1), flags, true, true, MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1),
MessageRange.all(), MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertTrue(message.isAnswered());
+            assertTrue(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertFalse(message.isFlagged());
+        }
+    }
+
+    /**
+     * Test message flag set to true
+     */
+    private void testMessageUpdateAddition() throws MailboxException {
+        LOG.info("message update : flag addition");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.FLAGGED);
+        messageMapper.updateFlags(MBOXES.get(1), flags, true, false, MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1),
MessageRange.all(), MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertTrue(message.isAnswered());
+            assertTrue(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertTrue(message.isFlagged());
+        }
+    }
+
+    /**
+     * Test message flag removal
+     */
+    private void testMessageUpdateRemove() throws MailboxException {
+        LOG.info("message update : flag removal");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.ANSWERED);
+        messageMapper.updateFlags(MBOXES.get(1), flags, false, false, MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1),
MessageRange.all(), MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertFalse(message.isAnswered());
+            assertTrue(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertTrue(message.isFlagged());
+        }
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message