james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 04/06: JAMES-2813 Make RabbitMQWorkQueue cluster-serialized
Date Tue, 03 Sep 2019 07:34:26 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aaf5257c40abac268fe1d1f0cb344cf7b9023963
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Thu Aug 22 16:38:40 2019 +0200

    JAMES-2813 Make RabbitMQWorkQueue cluster-serialized
---
 server/task-distributed/pom.xml                    |  10 ++
 .../distributed/RabbitMQExclusiveConsumer.java     | 174 +++++++++++++++++++++
 .../distributed/RabbitMQWorkQueue.java             | 146 +++++++++++++++++
 .../distributed/RabbitMQWorkQueueTest.java         | 147 +++++++++++++++++
 .../apache/james/task/SerialTaskManagerWorker.java |  15 +-
 .../org/apache/james/task/TaskManagerWorker.java   |   2 +
 6 files changed, 489 insertions(+), 5 deletions(-)

diff --git a/server/task-distributed/pom.xml b/server/task-distributed/pom.xml
index 844c693..89b4edf 100644
--- a/server/task-distributed/pom.xml
+++ b/server/task-distributed/pom.xml
@@ -39,6 +39,16 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-backends-cassandra</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
new file mode 100644
index 0000000..2977312
--- /dev/null
+++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
@@ -0,0 +1,174 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task.eventsourcing.distributed;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.RabbitFluxException;
+import reactor.rabbitmq.ReceiverOptions;
+
+/**
+ * Taken from {@link reactor.rabbitmq.Receiver}
+ * In order to be able to set the `exclusive` parameter to `true`
+ * to the `channel.basicConsume` method.
+ *
+ * @deprecated to remove once the parallel execution of task has been implemented
+ */
+@Deprecated
+public class RabbitMQExclusiveConsumer implements Closeable {
+    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION =
new RabbitMQExclusiveConsumer.ChannelCreationFunction();
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQExclusiveConsumer.class);
+
+    private static final boolean NON_LOCAL = true;
+    private static final boolean EXCLUSIVE = true;
+
+    private static class ChannelCreationFunction implements Function<Connection, Channel>
{
+
+        @Override
+        public Channel apply(Connection connection) {
+            try {
+                return connection.createChannel();
+            } catch (IOException e) {
+                throw new RabbitFluxException("Error while creating channel", e);
+            }
+        }
+    }
+
+    private Mono<? extends Connection> connectionMono;
+    private final AtomicBoolean hasConnection;
+    private final Scheduler connectionSubscriptionScheduler;
+    private final boolean privateConnectionSubscriptionScheduler;
+
+    public RabbitMQExclusiveConsumer(ReceiverOptions options) {
+        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler()
== null;
+        this.connectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler()
== null ?
+            createScheduler("rabbitmq-receiver-connection-subscription") : options.getConnectionSubscriptionScheduler();
+        hasConnection = new AtomicBoolean(false);
+        this.connectionMono = options.getConnectionMono() != null ? options.getConnectionMono()
:
+            Mono.fromCallable(() -> options.getConnectionFactory().newConnection())
+                .doOnSubscribe(c -> hasConnection.set(true))
+                .subscribeOn(this.connectionSubscriptionScheduler)
+                .cache();
+    }
+
+    protected Scheduler createScheduler(String name) {
+        return Schedulers.newElastic(name);
+    }
+
+
+    public Flux<AcknowledgableDelivery> consumeExclusiveManualAck(final String queue,
ConsumeOptions options) {
+        // TODO track flux so it can be disposed when the sender is closed?
+        // could be also developer responsibility
+        return Flux.create(emitter -> connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel
-> {
+            try {
+                if (options.getQos() != 0) {
+                    channel.basicQos(options.getQos());
+                }
+
+                DeliverCallback deliverCallback = (consumerTag, message) -> {
+                    AcknowledgableDelivery delivery = new AcknowledgableDelivery(message,
channel, options.getExceptionHandler());
+                    if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(),
delivery)) {
+                        emitter.next(delivery);
+                    }
+                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(),
message)) {
+                        emitter.complete();
+                    }
+                };
+
+                AtomicBoolean basicCancel = new AtomicBoolean(true);
+                CancelCallback cancelCallback = consumerTag -> {
+                    LOGGER.info("Flux consumer {} has been cancelled", consumerTag);
+                    basicCancel.set(false);
+                    emitter.complete();
+                };
+
+                completeOnChannelShutdown(channel, emitter);
+
+                Map<String, Object> arguments = ImmutableMap.of();
+                final String consumerTag = channel.basicConsume(queue, false, UUID.randomUUID().toString(),
!NON_LOCAL, EXCLUSIVE, arguments, deliverCallback, cancelCallback);
+                AtomicBoolean cancelled = new AtomicBoolean(false);
+                LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag,
queue);
+                emitter.onDispose(() -> {
+                    LOGGER.info("Cancelling consumer {} consuming from {}", consumerTag,
queue);
+                    if (cancelled.compareAndSet(false, true)) {
+                        try {
+                            if (channel.isOpen() && channel.getConnection().isOpen())
{
+                                if (basicCancel.compareAndSet(true, false)) {
+                                    channel.basicCancel(consumerTag);
+                                }
+                                channel.close();
+                            }
+                        } catch (TimeoutException | IOException e) {
+                            // Not sure what to do, not much we can do,
+                            // logging should be enough.
+                            // Maybe one good reason to introduce an exception handler to
choose more easily.
+                            LOGGER.warn("Error while closing channel: " + e.getMessage());
+                        }
+                    }
+                });
+            } catch (IOException e) {
+                throw new RabbitFluxException(e);
+            }
+        }, emitter::error), options.getOverflowStrategy());
+    }
+
+    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter)
{
+        channel.addShutdownListener(reason -> {
+            if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason))
{
+                emitter.complete();
+            }
+        });
+    }
+
+    public void close() {
+        if (hasConnection.getAndSet(false)) {
+            try {
+                // FIXME use timeout on block (should be a parameter of the Receiver)
+                connectionMono.block().close();
+            } catch (IOException e) {
+                throw new RabbitFluxException(e);
+            }
+        }
+        if (privateConnectionSubscriptionScheduler) {
+            this.connectionSubscriptionScheduler.dispose();
+        }
+    }
+}
diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
new file mode 100644
index 0000000..69cdf16
--- /dev/null
+++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -0,0 +1,146 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.task.eventsourcing.distributed;
+
+import java.nio.charset.StandardCharsets;
+
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.backend.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
+import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManagerWorker;
+import org.apache.james.task.TaskWithId;
+import org.apache.james.task.WorkQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+public class RabbitMQWorkQueue implements WorkQueue, Startable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
+
+    static final Integer MAX_CHANNELS_NUMBER = 1;
+    static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
+    static final String QUEUE_NAME = "taskManagerWorkQueue";
+    static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
+    public static final String TASK_ID = "taskId";
+
+    private final TaskManagerWorker worker;
+    private final Mono<Connection> connectionMono;
+    private final ReactorRabbitMQChannelPool channelPool;
+    private final JsonTaskSerializer taskSerializer;
+    private Sender sender;
+    private RabbitMQExclusiveConsumer receiver;
+
+    public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool,
JsonTaskSerializer taskSerializer) {
+        this.worker = worker;
+        this.connectionMono = simpleConnectionPool.getResilientConnection();
+        this.taskSerializer = taskSerializer;
+        this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
+    }
+
+    public void start() {
+        sender = RabbitFlux.createSender(new SenderOptions()
+            .connectionMono(connectionMono)
+            .channelPool(channelPool)
+            .resourceManagementChannelMono(
+                connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
+
+        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono));
+
+        sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
+        sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
+        sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
+
+        consumeWorkqueue();
+    }
+
+    private void consumeWorkqueue() {
+        receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
+            .subscribeOn(Schedulers.elastic())
+            .flatMap(this::executeTask)
+            .subscribe();
+    }
+
+    private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
+        String json = new String(delivery.getBody(), StandardCharsets.UTF_8);
+
+        TaskId taskId = TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString());
+
+        try {
+            Task task = taskSerializer.deserialize(json);
+            delivery.ack();
+            return worker.executeTask(new TaskWithId(taskId, task));
+        } catch (Exception e) {
+            LOGGER.error("Unable to run submitted Task " + taskId.asString(), e);
+            delivery.ack();
+            worker.fail(taskId, e);
+            return Mono.empty();
+        }
+    }
+
+    @Override
+    public void submit(TaskWithId taskWithId) {
+        try {
+            byte[] payload = taskSerializer.serialize(taskWithId.getTask()).getBytes(StandardCharsets.UTF_8);
+            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+                .headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString()))
+                .build();
+            OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY,
basicProperties, payload);
+            sender.send(Mono.just(outboundMessage)).block();
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void cancel(TaskId taskId) {
+         throw new NotImplementedException("Cancel not done yet");
+    }
+
+    @Override
+    @PreDestroy
+    public void close() {
+        channelPool.close();
+    }
+}
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
new file mode 100644
index 0000000..2383695
--- /dev/null
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -0,0 +1,147 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+package org.apache.james.task.eventsourcing.distributed;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.stream.IntStream;
+
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.server.task.json.TestTask;
+import org.apache.james.server.task.json.dto.TestTaskDTOModules;
+import org.apache.james.task.CompletedTask;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManagerWorker;
+import org.apache.james.task.TaskWithId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.ArgumentCaptor;
+import org.mockito.verification.Timeout;
+
+import reactor.core.publisher.Mono;
+
+class RabbitMQWorkQueueTest {
+    private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+    private static final TaskId TASK_ID_2 = TaskId.fromString("3c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+    private static final Task TASK = new CompletedTask();
+    private static final Task TASK2 = new CompletedTask();
+    private static final TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
+    private static final TaskWithId TASK_WITH_ID_2 = new TaskWithId(TASK_ID_2, TASK2);
+
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
+
+    private RabbitMQWorkQueue testee;
+    private TaskManagerWorker taskManagerWorker;
+    private JsonTaskSerializer taskSerializer;
+
+    @BeforeEach
+    void setUp() {
+        taskManagerWorker = mock(TaskManagerWorker.class);
+        when(taskManagerWorker.executeTask(TASK_WITH_ID)).thenReturn(Mono.just(Task.Result.COMPLETED));
+        when(taskManagerWorker.executeTask(TASK_WITH_ID_2)).thenReturn(Mono.just(Task.Result.COMPLETED));
+        taskSerializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
+        testee = new RabbitMQWorkQueue(taskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(),
taskSerializer);
+        testee.start();
+    }
+
+    @AfterEach
+    void tearDown() {
+        testee.close();
+    }
+
+    @Test
+    void workerShouldConsumeSubmittedTask() {
+        testee.submit(TASK_WITH_ID);
+
+        ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class);
+        verify(taskManagerWorker, timeout(1000)).executeTask(taskWithIdCaptor.capture());
+
+        TaskWithId actualTaskWithId = taskWithIdCaptor.getValue();
+        assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID);
+        assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type());
+    }
+
+    @Test
+    void workerShouldConsumeTwoSubmittedTask() {
+        testee.submit(TASK_WITH_ID);
+        testee.submit(TASK_WITH_ID_2);
+
+        ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class);
+        verify(taskManagerWorker, new Timeout(1000, times(2))).executeTask(taskWithIdCaptor.capture());
+
+        TaskWithId actualTaskWithId = taskWithIdCaptor.getAllValues().get(0);
+        assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID);
+        assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type());
+
+        TaskWithId actualSecondTaskWithId = taskWithIdCaptor.getAllValues().get(1);
+        assertThat(actualSecondTaskWithId.getId()).isEqualTo(TASK_ID_2);
+        assertThat(actualSecondTaskWithId.getTask().type()).isEqualTo(TASK2.type());
+    }
+
+    @Test
+    void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() {
+        testee.submit(TASK_WITH_ID);
+
+        TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class);
+        RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker,
rabbitMQExtension.getRabbitConnectionPool(), taskSerializer);
+        otherWorkQueue.start();
+
+        IntStream.range(0, 9)
+            .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2));
+
+        verify(taskManagerWorker, new Timeout(1000, times(10))).executeTask(any());
+
+        verify(otherTaskManagerWorker, new Timeout(1000, times(0))).executeTask(any());
+    }
+
+    @Test
+    void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks()
throws InterruptedException {
+        Task task = new TestTask(42);
+        TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd");
+        TaskWithId taskWithId = new TaskWithId(taskId, task);
+
+        TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class);
+        JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE);
+        RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker,
rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer);
+        //wait to be sur that the first workqueue has subscribed as an exclusive consumer
of the RabbitMQ queue.
+        Thread.sleep(200);
+        otherWorkQueue.start();
+
+        otherWorkQueue.submit(taskWithId);
+
+        verify(taskManagerWorker, new Timeout(100, times(0))).executeTask(any());
+        verify(taskManagerWorker, timeout(100)).fail(eq(taskId), any());
+
+        testee.submit(TASK_WITH_ID);
+        verify(taskManagerWorker, timeout(100)).executeTask(any());
+    }
+}
diff --git a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index afb7a39..c5a0638 100644
--- a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -60,11 +60,11 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
-            return Mono
-                .using(
-                    acquireSemaphore(taskWithId, listener),
-                    executeWithSemaphore(taskWithId, listener),
-                    Semaphore::release);
+        return Mono
+            .using(
+                acquireSemaphore(taskWithId, listener),
+                executeWithSemaphore(taskWithId, listener),
+                Semaphore::release);
 
     }
 
@@ -140,6 +140,11 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
     }
 
     @Override
+    public void fail(TaskId taskId, Throwable reason) {
+        listener.failed(taskId, reason);
+    }
+
+    @Override
     public void close() throws IOException {
         taskExecutor.shutdownNow();
     }
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
index 4780cc7..94051c7 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -39,4 +39,6 @@ public interface TaskManagerWorker extends Closeable {
     Mono<Task.Result> executeTask(TaskWithId taskWithId);
 
     void cancelTask(TaskId taskId);
+
+    void fail(TaskId taskId, Throwable reason);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message