james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [3/4] james-project git commit: JAMES-1945 Parralelize UID update in CassandraUidProvider
Date Thu, 23 Feb 2017 03:39:03 GMT
JAMES-1945 Parralelize UID update in CassandraUidProvider


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6d94a8fe
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6d94a8fe
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6d94a8fe

Branch: refs/heads/master
Commit: 6d94a8fe624160ee0dc0e42ed8be51c570347cc0
Parents: f2b7cd7
Author: Benoit Tellier <btellier@linagora.com>
Authored: Mon Feb 20 11:58:13 2017 +0700
Committer: benwa <btellier@linagora.com>
Committed: Thu Feb 23 10:38:02 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraUidProvider.java    | 145 +++++++++++--------
 .../CassandraSubscriptionManagerTest.java       |   7 +-
 2 files changed, 87 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/6d94a8fe/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index a6bd980..4aeb483 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -19,46 +19,71 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;
 
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
-import org.apache.james.backends.cassandra.utils.CassandraConstants;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.CassandraId;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.james.util.OptionalConverter;
 
-import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.BuiltStatement;
-import com.google.common.base.Throwables;
 
 public class CassandraUidProvider implements UidProvider {
-    public final static int DEFAULT_MAX_RETRY = 100000;
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraUidProvider.class);
+    private static final int DEFAULT_MAX_RETRY = 100000;
+    private static final String CONDITION = "Condition";
 
-    private final Session session;
+    private final CassandraAsyncExecutor executor;
     private final FunctionRunnerWithRetry runner;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement updateStatement;
+    private final PreparedStatement selectStatement;
 
     public CassandraUidProvider(Session session, int maxRetry) {
-        this.session = session;
+        this.executor = new CassandraAsyncExecutor(session);
         this.runner = new FunctionRunnerWithRetry(maxRetry);
+        this.selectStatement = prepareSelect(session);
+        this.updateStatement = prepareUpdate(session);
+        this.insertStatement = prepareInsert(session);
+    }
+
+    private PreparedStatement prepareSelect(Session session) {
+        return session.prepare(select(NEXT_UID)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
+    }
+
+    private PreparedStatement prepareUpdate(Session session) {
+        return session.prepare(update(TABLE_NAME)
+            .onlyIf(eq(NEXT_UID, bindMarker(CONDITION)))
+            .with(set(NEXT_UID, bindMarker(NEXT_UID)))
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(NEXT_UID, MessageUid.MIN_VALUE.asLong())
+            .value(MAILBOX_ID, bindMarker(MAILBOX_ID))
+            .ifNotExists());
     }
 
     @Inject
@@ -74,75 +99,67 @@ public class CassandraUidProvider implements UidProvider {
     @Override
     public MessageUid nextUid(MailboxSession session, MailboxId mailboxId) throws MailboxException
{
         CassandraId cassandraId = (CassandraId) mailboxId;
-        if (! findHighestUid(cassandraId).isPresent()) {
-            Optional<MessageUid> optional = tryInsertUid(cassandraId, Optional.empty());
-            if (optional.isPresent()) {
-                return optional.get();
-            }
-        }
+        return nextUid(cassandraId)
+        .join()
+        .orElseThrow(() -> new MailboxException("Error during Uid update"));
+    }
 
-        try {
-            return runner.executeAndRetrieveObject(
-                () -> {
-                    try {
-                        return tryUpdateUid(cassandraId, findHighestUid(cassandraId));
-                    } catch (Exception exception) {
-                        LOG.error("Can not retrieve next Uid", exception);
-                        throw Throwables.propagate(exception);
-                    }
-                });
-        } catch (LightweightTransactionException e) {
-            throw new MailboxException("Error during Uid update", e);
-        }
+    public CompletableFuture<Optional<MessageUid>> nextUid(CassandraId cassandraId)
{
+        return findHighestUid(cassandraId)
+            .thenCompose(optional -> {
+                if (optional.isPresent()) {
+                    return tryUpdateUid(cassandraId, optional);
+                }
+                return tryInsert(cassandraId);
+            })
+            .thenCompose(optional -> {
+                if (optional.isPresent()) {
+                    return CompletableFuture.completedFuture(optional);
+                }
+                return runner.executeAsyncAndRetrieveObject(
+                    () -> findHighestUid(cassandraId)
+                        .thenCompose(readUid -> tryUpdateUid(cassandraId, readUid)));
+            });
     }
 
     @Override
     public com.google.common.base.Optional<MessageUid> lastUid(MailboxSession mailboxSession,
Mailbox mailbox) throws MailboxException {
-        return findHighestUid((CassandraId) mailbox.getMailboxId());
-    }
-
-    private com.google.common.base.Optional<MessageUid> findHighestUid(CassandraId
mailboxId) throws MailboxException {
-        ResultSet result = session.execute(
-            select(NEXT_UID)
-                .from(CassandraMessageUidTable.TABLE_NAME)
-                .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())));
-        if (result.isExhausted()) {
-            return com.google.common.base.Optional.absent();
-        } else {
-            return com.google.common.base.Optional.of(MessageUid.of(result.one().getLong(NEXT_UID)));
-        }
+        return OptionalConverter.toGuava(findHighestUid((CassandraId) mailbox.getMailboxId()).join());
     }
 
-    private Optional<MessageUid> tryInsertUid(CassandraId mailboxId, Optional<MessageUid>
uid) {
-        MessageUid nextUid = uid.map(MessageUid::next).orElse(MessageUid.MIN_VALUE);
-        return transactionalStatementToOptionalUid(nextUid,
-            insertInto(CassandraMessageUidTable.TABLE_NAME)
-                .value(NEXT_UID, nextUid.asLong())
-                .value(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())
-                .ifNotExists());
+    private CompletableFuture<Optional<MessageUid>> findHighestUid(CassandraId
mailboxId) {
+        return executor.executeSingleRow(
+            selectStatement.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid()))
+            .thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(NEXT_UID))));
     }
 
-    private Optional<MessageUid> tryUpdateUid(CassandraId mailboxId, com.google.common.base.Optional<MessageUid>
uid) {
+    private CompletableFuture<Optional<MessageUid>> tryUpdateUid(CassandraId
mailboxId, Optional<MessageUid> uid) {
         if (uid.isPresent()) {
             MessageUid nextUid = uid.get().next();
-            return transactionalStatementToOptionalUid(nextUid,
-                    update(CassandraMessageUidTable.TABLE_NAME)
-                        .onlyIf(eq(NEXT_UID, uid.get().asLong()))
-                        .with(set(NEXT_UID, nextUid.asLong()))
-                        .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())));
+            return executor.executeReturnApplied(
+                updateStatement.bind()
+                    .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                    .setLong(CONDITION, uid.get().asLong())
+                    .setLong(NEXT_UID, nextUid.asLong()))
+                .thenApply(success -> successToUid(nextUid, success));
         } else {
-            return transactionalStatementToOptionalUid(MessageUid.MIN_VALUE,
-                    update(CassandraMessageUidTable.TABLE_NAME)
-                    .onlyIf(eq(NEXT_UID, null))
-                    .with(set(NEXT_UID, MessageUid.MIN_VALUE.asLong()))
-                    .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())));
+            return tryInsert(mailboxId);
         }
     }
 
-    private Optional<MessageUid> transactionalStatementToOptionalUid(MessageUid uid,
BuiltStatement statement) {
-        if(session.execute(statement).one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED))
{
+    private CompletableFuture<Optional<MessageUid>> tryInsert(CassandraId mailboxId)
{
+        return executor.executeReturnApplied(
+            insertStatement.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid()))
+            .thenApply(success -> successToUid(MessageUid.MIN_VALUE, success));
+    }
+
+    private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
+        if (success) {
             return Optional.of(uid);
         }
         return Optional.empty();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/6d94a8fe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
index b0b334e..0a3838d 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
@@ -34,7 +34,9 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.modules.CassandraMailboxCounterModule;
+import org.apache.james.mailbox.cassandra.modules.CassandraModSeqModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraSubscriptionModule;
+import org.apache.james.mailbox.cassandra.modules.CassandraUidModule;
 
 /**
  * Test Cassandra subscription against some general purpose written code.
@@ -43,7 +45,10 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage
 
     private static final CassandraCluster cassandra = CassandraCluster.create(
         new CassandraModuleComposite(
-            new CassandraSubscriptionModule(), new CassandraMailboxCounterModule()));
+            new CassandraSubscriptionModule(),
+            new CassandraMailboxCounterModule(),
+            new CassandraUidModule(),
+            new CassandraModSeqModule()));
     
     @Override
     public SubscriptionManager createSubscriptionManager() {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message