james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From btell...@apache.org
Subject [12/17] james-project git commit: JAMES-2555 Wrap reindexing into a task
Date Mon, 15 Oct 2018 06:28:35 GMT
JAMES-2555 Wrap reindexing into a task


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7c4f4d86
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7c4f4d86
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7c4f4d86

Branch: refs/heads/master
Commit: 7c4f4d86367d3fc61fcf4854280137551bd6d14b
Parents: 884b88f
Author: Benoit Tellier <btellier@linagora.com>
Authored: Thu Oct 11 16:22:57 2018 +0700
Committer: Benoit Tellier <btellier@linagora.com>
Committed: Mon Oct 15 13:17:46 2018 +0700

----------------------------------------------------------------------
 mailbox/api/pom.xml                             |   4 +
 .../apache/james/mailbox/indexer/ReIndexer.java |   5 +-
 .../tools/indexer/FullReindexingTask.java       |  79 +++++++++
 .../mailbox/tools/indexer/ReIndexerImpl.java    | 131 +--------------
 .../tools/indexer/ReIndexerPerformer.java       | 168 +++++++++++++++++++
 .../tools/indexer/ReprocessingContext.java      |  53 ++++++
 .../indexer/SingleMailboxReindexingTask.java    |  88 ++++++++++
 .../mailbox/tools/indexer/ThrowsReIndexer.java  |   5 +-
 .../indexer/CassandraReIndexerImplTest.java     |   4 +-
 .../tools/indexer/ReIndexerImplTest.java        |   6 +-
 .../cli/ReindexCommandIntegrationTest.java      |   7 +-
 .../adapter/mailbox/ReIndexerManagement.java    |  11 +-
 .../META-INF/org/apache/james/spring-server.xml |   3 +
 13 files changed, 428 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/api/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/api/pom.xml b/mailbox/api/pom.xml
index 18578c1..56237b3 100644
--- a/mailbox/api/pom.xml
+++ b/mailbox/api/pom.xml
@@ -38,6 +38,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-task</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
index 302e01c..94304a7 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
@@ -21,11 +21,12 @@ package org.apache.james.mailbox.indexer;
 
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.Task;
 
 public interface ReIndexer {
 
-    void reIndex(MailboxPath path) throws MailboxException;
+    Task reIndex(MailboxPath path) throws MailboxException;
 
-    void reIndex() throws MailboxException;
+    Task reIndex() throws MailboxException;
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
new file mode 100644
index 0000000..196c1d4
--- /dev/null
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
@@ -0,0 +1,79 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.mailbox.tools.indexer;
+
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+
+public class FullReindexingTask implements Task {
+
+    public static final String FULL_RE_INDEXING = "FullReIndexing";
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation
{
+        private final ReprocessingContext reprocessingContext;
+
+        AdditionalInformation(ReprocessingContext reprocessingContext) {
+            this.reprocessingContext = reprocessingContext;
+        }
+
+        public int getSuccessfullyReprocessMailCount() {
+            return reprocessingContext.successfullyReprocessedMailCount();
+        }
+
+        public int getFailedReprocessedMailCount() {
+            return reprocessingContext.failedReprocessingMailCount();
+        }
+    }
+
+    private final ReIndexerPerformer reIndexerPerformer;
+    private final AdditionalInformation additionalInformation;
+    private final ReprocessingContext reprocessingContext;
+
+    @Inject
+    public FullReindexingTask(ReIndexerPerformer reIndexerPerformer) {
+        this.reIndexerPerformer = reIndexerPerformer;
+        this.reprocessingContext = new ReprocessingContext();
+        this.additionalInformation = new AdditionalInformation(reprocessingContext);
+    }
+
+    @Override
+    public Result run() {
+        try {
+            return reIndexerPerformer.reIndex(reprocessingContext);
+        } catch (MailboxException e) {
+            return Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public String type() {
+        return FULL_RE_INDEXING;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(additionalInformation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
index f239e32..415aacc 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
@@ -19,33 +19,11 @@
 
 package org.apache.mailbox.tools.indexer;
 
-import java.util.List;
-import java.util.Optional;
-
 import javax.inject.Inject;
 
-import org.apache.james.mailbox.MailboxManager;
-import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.indexer.ReIndexer;
 import org.apache.james.mailbox.model.MailboxPath;
-import org.apache.james.mailbox.model.MessageRange;
-import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
-import org.apache.james.mailbox.store.mail.MessageMapper;
-import org.apache.james.mailbox.store.mail.model.Mailbox;
-import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
-import org.apache.james.util.streams.Iterators;
-import org.apache.mailbox.tools.indexer.events.ImpactingEventType;
-import org.apache.mailbox.tools.indexer.events.ImpactingMessageEvent;
-import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration;
-import org.apache.mailbox.tools.indexer.registrations.MailboxRegistration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.collect.Lists;
+import org.apache.james.task.Task;
 
 /**
  * Note about live re-indexation handling :
@@ -61,114 +39,21 @@ import com.google.common.collect.Lists;
  */
 public class ReIndexerImpl implements ReIndexer {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerImpl.class);
-
-    private static final int NO_LIMIT = 0;
-    private static final int SINGLE_MESSAGE = 1;
-
-    private final MailboxManager mailboxManager;
-    private final ListeningMessageSearchIndex messageSearchIndex;
-    private final MailboxSessionMapperFactory mailboxSessionMapperFactory;
+    private final ReIndexerPerformer reIndexerPerformer;
 
     @Inject
-    public ReIndexerImpl(MailboxManager mailboxManager,
-                         ListeningMessageSearchIndex messageSearchIndex,
-                         MailboxSessionMapperFactory mailboxSessionMapperFactory) {
-        this.mailboxManager = mailboxManager;
-        this.messageSearchIndex = messageSearchIndex;
-        this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
+    public ReIndexerImpl(ReIndexerPerformer reIndexerPerformer) {
+        this.reIndexerPerformer = reIndexerPerformer;
     }
 
     @Override
-    public void reIndex(MailboxPath path) throws MailboxException {
-        MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser());
-        reIndex(path, mailboxSession);
+    public Task reIndex(MailboxPath path) {
+        return new SingleMailboxReindexingTask(reIndexerPerformer, path);
     }
 
-
     @Override
-    public void reIndex() throws MailboxException {
-        MailboxSession mailboxSession = mailboxManager.createSystemSession("re-indexing");
-        LOGGER.info("Starting a full reindex");
-        List<MailboxPath> mailboxPaths = mailboxManager.list(mailboxSession);
-        GlobalRegistration globalRegistration = new GlobalRegistration();
-        mailboxManager.addGlobalListener(globalRegistration, mailboxSession);
-        try {
-            handleFullReindexingIterations(mailboxPaths, globalRegistration);
-        } finally {
-            mailboxManager.removeGlobalListener(globalRegistration, mailboxSession);
-        }
-        LOGGER.info("Full reindex finished");
-    }
-
-    private void handleFullReindexingIterations(List<MailboxPath> mailboxPaths, GlobalRegistration
globalRegistration) {
-        for (MailboxPath mailboxPath : mailboxPaths) {
-            Optional<MailboxPath> pathToIndex = globalRegistration.getPathToIndex(mailboxPath);
-            if (pathToIndex.isPresent()) {
-                try {
-                    reIndex(pathToIndex.get());
-                } catch (Throwable e) {
-                    LOGGER.error("Error while proceeding to full reindexing on {}", pathToIndex.get(),
e);
-                }
-            }
-        }
-    }
-
-
-    private void reIndex(MailboxPath path, MailboxSession mailboxSession) throws MailboxException
{
-        MailboxRegistration mailboxRegistration = new MailboxRegistration(path);
-        LOGGER.info("Intend to reindex {}",path);
-        Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
-        messageSearchIndex.deleteAll(mailboxSession, mailbox);
-        mailboxManager.addListener(path, mailboxRegistration, mailboxSession);
-        try {
-            Iterators.toStream(
-                mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-                    .findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata,
NO_LIMIT))
-                .map(MailboxMessage::getUid)
-                .forEach(uid -> handleMessageReIndexing(mailboxSession, mailboxRegistration,
mailbox, uid));
-            LOGGER.info("Finish to reindex {}", path);
-        } finally {
-            mailboxManager.removeListener(path, mailboxRegistration, mailboxSession);
-        }
-    }
-
-    private void handleMessageReIndexing(MailboxSession mailboxSession, MailboxRegistration
mailboxRegistration, Mailbox mailbox, MessageUid uid) {
-        try {
-            Optional<ImpactingMessageEvent> impactingMessageEvent = findMostRelevant(mailboxRegistration.getImpactingEvents(uid));
-
-            Optional.of(uid)
-                .filter(x -> !wasDeleted(impactingMessageEvent))
-                .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox,
mUid)))
-                .map(message -> messageUpdateRegardingEvents(message, impactingMessageEvent))
-                .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession,
mailbox, message)));
-        } catch (Exception e) {
-            LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(),
uid, e);
-        }
-    }
-
-    private Optional<ImpactingMessageEvent> findMostRelevant(List<ImpactingMessageEvent>
messageEvents) {
-        for (ImpactingMessageEvent impactingMessageEvent : messageEvents) {
-            if (impactingMessageEvent.getType().equals(ImpactingEventType.Deletion)) {
-                return Optional.of(impactingMessageEvent);
-            }
-        }
-        return Lists.reverse(messageEvents).stream().findFirst();
-    }
-
-    private boolean wasDeleted(Optional<ImpactingMessageEvent> impactingMessageEvent)
{
-        return impactingMessageEvent.map(ImpactingMessageEvent::wasDeleted).orElse(false);
-    }
-
-    private MailboxMessage messageUpdateRegardingEvents(MailboxMessage message, Optional<ImpactingMessageEvent>
impactingMessageEvent) {
-        impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags);
-        return message;
-    }
-
-    private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession,
Mailbox mailbox, MessageUid mUid) throws MailboxException {
-        return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-            .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full,
SINGLE_MESSAGE))
-            .findFirst();
+    public Task reIndex() {
+        return new FullReindexingTask(reIndexerPerformer);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
new file mode 100644
index 0000000..e70b88d
--- /dev/null
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -0,0 +1,168 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.mailbox.tools.indexer;
+
+import java.util.List;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import org.apache.james.task.Task;
+import org.apache.james.util.OptionalUtils;
+import org.apache.james.util.streams.Iterators;
+import org.apache.mailbox.tools.indexer.events.ImpactingEventType;
+import org.apache.mailbox.tools.indexer.events.ImpactingMessageEvent;
+import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration;
+import org.apache.mailbox.tools.indexer.registrations.MailboxRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.Lists;
+
+public class ReIndexerPerformer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
+
+    private static final int NO_LIMIT = 0;
+    private static final int SINGLE_MESSAGE = 1;
+
+    private final MailboxManager mailboxManager;
+    private final ListeningMessageSearchIndex messageSearchIndex;
+    private final MailboxSessionMapperFactory mailboxSessionMapperFactory;
+
+    @Inject
+    public ReIndexerPerformer(MailboxManager mailboxManager,
+                              ListeningMessageSearchIndex messageSearchIndex,
+                              MailboxSessionMapperFactory mailboxSessionMapperFactory) {
+        this.mailboxManager = mailboxManager;
+        this.messageSearchIndex = messageSearchIndex;
+        this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
+    }
+
+    public Task.Result reIndex(MailboxPath path, ReprocessingContext reprocessingContext)
throws MailboxException {
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser());
+        return reIndex(path, mailboxSession, reprocessingContext);
+    }
+
+    public Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException
{
+        MailboxSession mailboxSession = mailboxManager.createSystemSession("re-indexing");
+        LOGGER.info("Starting a full reindex");
+        List<MailboxPath> mailboxPaths = mailboxManager.list(mailboxSession);
+        GlobalRegistration globalRegistration = new GlobalRegistration();
+        mailboxManager.addGlobalListener(globalRegistration, mailboxSession);
+        try {
+            return handleFullReindexingIterations(mailboxPaths, globalRegistration, reprocessingContext);
+        } finally {
+            mailboxManager.removeGlobalListener(globalRegistration, mailboxSession);
+            LOGGER.info("Full reindex finished");
+        }
+    }
+
+    private Task.Result handleFullReindexingIterations(List<MailboxPath> mailboxPaths,
GlobalRegistration globalRegistration,
+                                                       ReprocessingContext reprocessingContext)
{
+        return mailboxPaths.stream()
+            .map(globalRegistration::getPathToIndex)
+            .flatMap(OptionalUtils::toStream)
+            .map(path -> {
+                try {
+                    return reIndex(path, reprocessingContext);
+                } catch (Throwable e) {
+                    LOGGER.error("Error while proceeding to full reindexing on {}", path.asString(),
e);
+                    return Task.Result.PARTIAL;
+                }
+            })
+            .reduce(Task::combine)
+            .orElse(Task.Result.COMPLETED);
+    }
+
+    private Task.Result reIndex(MailboxPath path, MailboxSession mailboxSession, ReprocessingContext
reprocessingContext) throws MailboxException {
+        MailboxRegistration mailboxRegistration = new MailboxRegistration(path);
+        LOGGER.info("Intend to reindex {}", path.asString());
+        Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
+        messageSearchIndex.deleteAll(mailboxSession, mailbox);
+        mailboxManager.addListener(path, mailboxRegistration, mailboxSession);
+        try {
+            return Iterators.toStream(
+                mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+                    .findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata,
NO_LIMIT))
+                .map(MailboxMessage::getUid)
+                .map(uid -> handleMessageReIndexing(mailboxSession, mailboxRegistration,
mailbox, uid))
+                .peek(reprocessingContext::updateAccordingToReprocessingResult)
+                .reduce(Task::combine)
+                .orElse(Task.Result.COMPLETED);
+        } finally {
+            LOGGER.info("Finish to reindex {}", path.asString());
+            mailboxManager.removeListener(path, mailboxRegistration, mailboxSession);
+        }
+    }
+
+    private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, MailboxRegistration
mailboxRegistration, Mailbox mailbox, MessageUid uid) {
+        try {
+            Optional<ImpactingMessageEvent> impactingMessageEvent = findMostRelevant(mailboxRegistration.getImpactingEvents(uid));
+
+            Optional.of(uid)
+                .filter(x -> !wasDeleted(impactingMessageEvent))
+                .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox,
mUid)))
+                .map(message -> messageUpdateRegardingEvents(message, impactingMessageEvent))
+                .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession,
mailbox, message)));
+            return Task.Result.COMPLETED;
+        } catch (Exception e) {
+            LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(),
uid, e);
+            return Task.Result.PARTIAL;
+        }
+    }
+
+    private Optional<ImpactingMessageEvent> findMostRelevant(List<ImpactingMessageEvent>
messageEvents) {
+        for (ImpactingMessageEvent impactingMessageEvent : messageEvents) {
+            if (impactingMessageEvent.getType().equals(ImpactingEventType.Deletion)) {
+                return Optional.of(impactingMessageEvent);
+            }
+        }
+        return Lists.reverse(messageEvents).stream().findFirst();
+    }
+
+    private boolean wasDeleted(Optional<ImpactingMessageEvent> impactingMessageEvent)
{
+        return impactingMessageEvent.map(ImpactingMessageEvent::wasDeleted).orElse(false);
+    }
+
+    private MailboxMessage messageUpdateRegardingEvents(MailboxMessage message, Optional<ImpactingMessageEvent>
impactingMessageEvent) {
+        impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags);
+        return message;
+    }
+
+    private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession,
Mailbox mailbox, MessageUid mUid) throws MailboxException {
+        return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+            .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full,
SINGLE_MESSAGE))
+            .findFirst();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
new file mode 100644
index 0000000..08417cf
--- /dev/null
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.mailbox.tools.indexer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.task.Task;
+
+public class ReprocessingContext {
+    private final AtomicInteger successfullyReprocessedMails;
+    private final AtomicInteger failedReprocessingMails;
+
+    public ReprocessingContext() {
+        failedReprocessingMails = new AtomicInteger(0);
+        successfullyReprocessedMails = new AtomicInteger(0);
+    }
+
+    public void updateAccordingToReprocessingResult(Task.Result result) {
+        switch (result) {
+            case COMPLETED:
+                successfullyReprocessedMails.incrementAndGet();
+                break;
+            case PARTIAL:
+                failedReprocessingMails.incrementAndGet();
+                break;
+        }
+    }
+
+    public int successfullyReprocessedMailCount() {
+        return successfullyReprocessedMails.get();
+    }
+
+    public int failedReprocessingMailCount() {
+        return failedReprocessingMails.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
new file mode 100644
index 0000000..fc9f834
--- /dev/null
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
@@ -0,0 +1,88 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.mailbox.tools.indexer;
+
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+
+public class SingleMailboxReindexingTask implements Task {
+
+    public static final String MAILBOX_RE_INDEXING = "mailboxReIndexing";
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation
{
+        private final MailboxPath mailboxPath;
+        private final ReprocessingContext reprocessingContext;
+
+        AdditionalInformation(MailboxPath mailboxPath, ReprocessingContext reprocessingContext)
{
+            this.mailboxPath = mailboxPath;
+            this.reprocessingContext = reprocessingContext;
+        }
+
+        public String getMailboxPath() {
+            return mailboxPath.asString();
+        }
+
+        public int getSuccessfullyReprocessMailCount() {
+            return reprocessingContext.successfullyReprocessedMailCount();
+        }
+
+        public int getFailedReprocessedMailCount() {
+            return reprocessingContext.failedReprocessingMailCount();
+        }
+    }
+
+    private final ReIndexerPerformer reIndexerPerformer;
+    private final MailboxPath path;
+    private final AdditionalInformation additionalInformation;
+    private final ReprocessingContext reprocessingContext;
+
+    @Inject
+    public SingleMailboxReindexingTask(ReIndexerPerformer reIndexerPerformer, MailboxPath
path) {
+        this.reIndexerPerformer = reIndexerPerformer;
+        this.path = path;
+        this.reprocessingContext = new ReprocessingContext();
+        this.additionalInformation = new AdditionalInformation(path, reprocessingContext);
+    }
+
+    @Override
+    public Result run() {
+        try {
+            return reIndexerPerformer.reIndex(path, reprocessingContext);
+        } catch (MailboxException e) {
+            return Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public String type() {
+        return MAILBOX_RE_INDEXING;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(additionalInformation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java
index 462537b..6734795 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java
@@ -22,16 +22,17 @@ package org.apache.mailbox.tools.indexer;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.indexer.ReIndexer;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.Task;
 
 public class ThrowsReIndexer implements ReIndexer {
 
     @Override
-    public void reIndex(MailboxPath path) throws MailboxException {
+    public Task reIndex(MailboxPath path) throws MailboxException {
         throw new MailboxException("Not implemented");
     }
 
     @Override
-    public void reIndex() throws MailboxException {
+    public Task reIndex() throws MailboxException {
         throw new MailboxException("Not implemented");
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
index 26f68cc..6c55478 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
@@ -64,7 +64,7 @@ public class CassandraReIndexerImplTest {
         mailboxManager = CassandraMailboxManagerProvider.provideMailboxManager(cassandra.getConf(),
cassandra.getTypesProvider());
         MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
         messageSearchIndex = mock(ListeningMessageSearchIndex.class);
-        reIndexer = new ReIndexerImpl(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory);
+        reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex,
mailboxSessionMapperFactory));
     }
 
     @Test
@@ -88,7 +88,7 @@ public class CassandraReIndexerImplTest {
             .runSuccessfullyWithin(Duration.ofMinutes(10));
 
         // When We re-index
-        reIndexer.reIndex(INBOX);
+        reIndexer.reIndex(INBOX).run();
 
         // The indexer is called for each message
         verify(messageSearchIndex).deleteAll(any(MailboxSession.class), any(Mailbox.class));

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
index 76f8a00..003601e 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
@@ -57,7 +57,7 @@ public class ReIndexerImplTest {
         mailboxManager = new InMemoryIntegrationResources().createMailboxManager(new SimpleGroupMembershipResolver());
         MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
         messageSearchIndex = mock(ListeningMessageSearchIndex.class);
-        reIndexer = new ReIndexerImpl(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory);
+        reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex,
mailboxSessionMapperFactory));
     }
 
     @Test
@@ -69,7 +69,7 @@ public class ReIndexerImplTest {
                 MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                 systemSession);
 
-        reIndexer.reIndex(INBOX);
+        reIndexer.reIndex(INBOX).run();
 
         ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class);
         ArgumentCaptor<Mailbox> mailboxCaptor1 = ArgumentCaptor.forClass(Mailbox.class);
@@ -94,7 +94,7 @@ public class ReIndexerImplTest {
                 MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                 systemSession);
 
-        reIndexer.reIndex();
+        reIndexer.reIndex().run();
         ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class);
         ArgumentCaptor<Mailbox> mailboxCaptor1 = ArgumentCaptor.forClass(Mailbox.class);
         ArgumentCaptor<Mailbox> mailboxCaptor2 = ArgumentCaptor.forClass(Mailbox.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java
----------------------------------------------------------------------
diff --git a/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java
b/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java
index bccaa62..6d81781 100644
--- a/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java
+++ b/server/container/cli-integration/src/test/java/org/apache/james/cli/ReindexCommandIntegrationTest.java
@@ -19,8 +19,10 @@
 
 package org.apache.james.cli;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.apache.james.GuiceJamesServer;
 import org.apache.james.MemoryJmapTestRule;
@@ -29,6 +31,7 @@ import org.apache.james.mailbox.model.MailboxConstants;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.modules.server.JMXServerModule;
+import org.apache.james.task.Task;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -37,7 +40,7 @@ import org.junit.Test;
 import com.google.inject.name.Names;
 
 public class ReindexCommandIntegrationTest {
-    public static final String USER = "user";
+    private static final String USER = "user";
     private ReIndexer reIndexer;
 
     @Rule
@@ -47,6 +50,8 @@ public class ReindexCommandIntegrationTest {
     @Before
     public void setUp() throws Exception {
         reIndexer = mock(ReIndexer.class);
+        when(reIndexer.reIndex()).thenReturn(() -> Task.Result.COMPLETED);
+        when(reIndexer.reIndex(any(MailboxPath.class))).thenReturn(() -> Task.Result.COMPLETED);
         guiceJamesServer = memoryJmap.jmapServer(new JMXServerModule(),
             binder -> binder.bind(ListeningMessageSearchIndex.class).toInstance(mock(ListeningMessageSearchIndex.class)))
             .overrideWith(binder -> binder.bind(ReIndexer.class)

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
----------------------------------------------------------------------
diff --git a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
index c5110a8..760b3d9 100644
--- a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
+++ b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
@@ -28,14 +28,18 @@ import javax.inject.Named;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.indexer.ReIndexer;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManager;
 import org.apache.james.util.MDCBuilder;
 
 public class ReIndexerManagement implements ReIndexerManagementMBean {
 
-    private ReIndexer reIndexer;
+    private final TaskManager taskManager;
+    private final ReIndexer reIndexer;
 
     @Inject
-    public void setReIndexer(@Named("reindexer") ReIndexer reIndexer) {
+    public ReIndexerManagement(TaskManager taskManager, @Named("reindexer") ReIndexer reIndexer)
{
+        this.taskManager = taskManager;
         this.reIndexer = reIndexer;
     }
 
@@ -46,7 +50,8 @@ public class ReIndexerManagement implements ReIndexerManagementMBean {
                      .addContext(MDCBuilder.PROTOCOL, "CLI")
                      .addContext(MDCBuilder.ACTION, "reIndex")
                      .build()) {
-            reIndexer.reIndex(new MailboxPath(namespace, user, name));
+            TaskId taskId = taskManager.submit(reIndexer.reIndex(new MailboxPath(namespace,
user, name)));
+            taskManager.await(taskId);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7c4f4d86/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml
----------------------------------------------------------------------
diff --git a/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml
b/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml
index fa12ef6..aab5c34 100644
--- a/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml
+++ b/server/container/spring/src/main/resources/META-INF/org/apache/james/spring-server.xml
@@ -287,6 +287,9 @@
     <bean id="quotamanagermanagement" class="org.apache.james.adapter.mailbox.QuotaManagement"/>
     <bean id="reindexermanagement" class="org.apache.james.adapter.mailbox.ReIndexerManagement"/>
     <bean id="sievemanagerbean" class="org.apache.james.sieverepository.lib.SieveRepositoryManagement"/>
+
+
+    <bean id="taskManager" class="org.apache.james.task.MemoryTaskManager"/>
     <!--
         <bean id="james23importermanagement" class="org.apache.james.container.spring.tool.James23ImporterManagement"
/>
     -->


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message