james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 01/29: JAMES-2813 Add cancel support in DistributedTaskManager
Date Tue, 03 Sep 2019 15:39:53 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit da774c0a0062ec55993eb126e7b2bf2069e1b062
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Wed Aug 28 16:05:52 2019 +0200

    JAMES-2813 Add cancel support in DistributedTaskManager
---
 .../distributed/RabbitMQWorkQueue.java             | 66 ++++++++++++++++++++--
 .../distributed/DistributedTaskManagerTest.java    | 55 +++++++++++++-----
 .../org/apache/james/task/TaskManagerContract.java |  6 +-
 3 files changed, 104 insertions(+), 23 deletions(-)

diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index e954c31..7b80e6d 100644
--- a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -21,10 +21,11 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.UUID;
 
 import javax.annotation.PreDestroy;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.backend.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
 import org.apache.james.lifecycle.api.Startable;
@@ -42,7 +43,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.UnicastProcessor;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.BindingSpecification;
@@ -50,16 +55,22 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
 
-    static final Integer MAX_CHANNELS_NUMBER = 1;
+    // Need at least one by receivers plus a shared one for senders
+    static final Integer MAX_CHANNELS_NUMBER = 5;
     static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
     static final String QUEUE_NAME = "taskManagerWorkQueue";
     static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
+
+    static final String CANCEL_REQUESTS_EXCHANGE_NAME = "taskManagerCancelRequestsExchange";
+    static final String CANCEL_REQUESTS_ROUTING_KEY = "taskManagerCancelRequestsRoutingKey";
+    private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = "taskManagerCancelRequestsQueue";
     public static final String TASK_ID = "taskId";
 
     private final TaskManagerWorker worker;
@@ -68,6 +79,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private final JsonTaskSerializer taskSerializer;
     private Sender sender;
     private RabbitMQExclusiveConsumer receiver;
+    private UnicastProcessor<TaskId> sendCancelRequestsQueue;
+    private Disposable sendCancelRequestsQueueHandle;
 
     public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool,
JsonTaskSerializer taskSerializer) {
         this.worker = worker;
@@ -77,10 +90,12 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     public void start() {
-        sender = channelPool.createSender();
-
-        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono));
+        startWorkqueue();
+        listenToCancelRequests();
+    }
 
+    private void startWorkqueue() {
+        sender = channelPool.createSender();
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
         sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
@@ -89,6 +104,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     private void consumeWorkqueue() {
+        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono));
         receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.elastic())
             .flatMap(this::executeTask)
@@ -112,6 +128,43 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
         }
     }
 
+    void listenToCancelRequests() {
+        Sender cancelRequestSender = channelPool.createSender();
+        String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
+
+        cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
+        cancelRequestSender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
+        cancelRequestSender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME,
CANCEL_REQUESTS_ROUTING_KEY, queueName)).block();
+        registerCancelRequestsListener(queueName);
+
+        sendCancelRequestsQueue = UnicastProcessor.create();
+        sendCancelRequestsQueueHandle = cancelRequestSender
+            .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage))
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+    }
+
+    private void registerCancelRequestsListener(String queueName) {
+        RabbitFlux
+            .createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+            .consumeAutoAck(queueName)
+            .subscribeOn(Schedulers.elastic())
+            .map(this::readCancelRequestMessage)
+            .doOnNext(worker::cancelTask)
+            .subscribe();
+    }
+
+    private TaskId readCancelRequestMessage(Delivery delivery) {
+        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
+        return TaskId.fromString(message);
+    }
+
+    private OutboundMessage makeCancelRequestMessage(TaskId taskId) {
+        byte[] payload = taskId.asString().getBytes(StandardCharsets.UTF_8);
+        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build();
+        return new OutboundMessage(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY,
basicProperties, payload);
+    }
+
     @Override
     public void submit(TaskWithId taskWithId) {
         try {
@@ -128,12 +181,13 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
 
     @Override
     public void cancel(TaskId taskId) {
-         throw new NotImplementedException("Cancel not done yet");
+        sendCancelRequestsQueue.onNext(taskId);
     }
 
     @Override
     @PreDestroy
     public void close() {
+        Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         channelPool.close();
     }
 }
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 5db982d..e38edd9 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -54,16 +55,15 @@ import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
+
+import com.github.steveash.guavate.Guavate;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import com.github.steveash.guavate.Guavate;
-
 @ExtendWith(CountDownLatchExtension.class)
 class DistributedTaskManagerTest implements TaskManagerContract {
 
@@ -131,17 +131,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    @Disabled("Cancelling is not supported yet")
-    public void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
-    }
-
-    @Test
-    @Disabled("Cancelling is not supported yet")
-    public void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) {
-    }
-
-    @Test
-    void givenTwoTaskManagersAndTwoTasksOnlyOneTaskShouldRunAtTheSameTime() {
+    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() throws InterruptedException
{
         CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
 
         try (EventSourcingTaskManager taskManager1 = taskManager();
@@ -182,4 +172,41 @@ class DistributedTaskManagerTest implements TaskManagerContract {
             }
         }
     }
+
+    @Test
+    void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch
countDownLatch) {
+        TaskManager taskManager1 = taskManager(HOSTNAME);
+        TaskManager taskManager2 = taskManager(HOSTNAME_2);
+        TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
+            countDownLatch.await();
+            return Task.Result.COMPLETED;
+        }));
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager1);
+        Hostname runningNode = taskManager1.getExecutionDetails(id).getRanNode().get();
+
+        Pair<Hostname, TaskManager> remoteTaskManager = getOtherTaskManager(runningNode,
Pair.of(HOSTNAME, taskManager1), Pair.of(HOSTNAME_2, taskManager2));
+        remoteTaskManager.getValue().cancel(id);
+
+        awaitAtMostFiveSeconds.untilAsserted(() ->
+            assertThat(taskManager1.getExecutionDetails(id).getStatus())
+                .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED));
+
+        countDownLatch.countDown();
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager1);
+        assertThat(taskManager1.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+
+        assertThat(taskManager1.getExecutionDetails(id).getCancelRequestedNode())
+            .contains(remoteTaskManager.getKey());
+    }
+
+    private Pair<Hostname, TaskManager> getOtherTaskManager(Hostname node, Pair<Hostname,
TaskManager> taskManager1, Pair<Hostname, TaskManager>  taskManager2) {
+        if (node.equals(taskManager1.getKey())) {
+            return taskManager2;
+        } else {
+            return taskManager1;
+        }
+    }
 }
diff --git a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
index 31def77..2fa605d 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -141,15 +141,15 @@ public interface TaskManagerContract {
         awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager);
         taskManager.cancel(id);
 
-        assertThat(taskManager.getExecutionDetails(id).getStatus())
-            .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+        awaitAtMostFiveSeconds.untilAsserted(() ->
+            assertThat(taskManager.getExecutionDetails(id).getStatus())
+                .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED));
 
         countDownLatch.countDown();
 
         awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager);
         assertThat(taskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
-
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message