james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 03/06: JAMES-2813 Add DistributedTaskManagerTest cases with multiple TaskManagers
Date Tue, 03 Sep 2019 07:34:25 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f6b8deb6d0b7daeb2e4614bf7dda0b2d2dea3ef2
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Thu Aug 22 15:07:56 2019 +0200

    JAMES-2813 Add DistributedTaskManagerTest cases with multiple TaskManagers
---
 .../distributed/DistributedTaskManagerTest.java    | 43 +++++++++++++++++++++-
 1 file changed, 42 insertions(+), 1 deletion(-)

diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 7561181..994edca 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -40,6 +40,8 @@ import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
 import org.apache.james.task.CountDownLatchExtension;
+import org.apache.james.task.MemoryReferenceTask;
+import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
@@ -91,11 +93,12 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     private final CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO
= new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
     private final TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
 
-    WorkQueueSupplier workQueueSupplier = new RabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(),
TASK_SERIALIZER);
+    private WorkQueueSupplier workQueueSupplier;
     private EventStore eventStore;
 
     @BeforeEach
     void setUp(EventStore eventStore) {
+        workQueueSupplier = new RabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(),
TASK_SERIALIZER);
         this.eventStore = eventStore;
     }
 
@@ -128,4 +131,42 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     @Disabled("Cancelling is not supported yet")
     public void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) {
     }
+
+    @Test
+    void givenTwoTaskManagersAndTwoTasksOnlyOneTaskShouldRunAtTheSameTime() throws InterruptedException
{
+        CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
+
+        TaskManager taskManager1 = taskManager();
+        TaskManager taskManager2 = taskManager();
+
+        taskManager1.submit(new MemoryReferenceTask(() -> {
+            waitingForFirstTaskLatch.await();
+            return Task.Result.COMPLETED;
+        }));
+        TaskId waitingTaskId = taskManager1.submit(new CompletedTask());
+
+        awaitUntilTaskHasStatus(waitingTaskId, TaskManager.Status.WAITING, taskManager2);
+        waitingForFirstTaskLatch.countDown();
+
+        Awaitility.await()
+            .atMost(Duration.ONE_SECOND)
+            .pollInterval(100L, TimeUnit.MILLISECONDS)
+            .until(() -> taskManager1.await(waitingTaskId).getStatus() == TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+        // FIXME it's currently dependent of the implementation of the sequential TaskManager
with the exclusive RabbitMQ consumer
+        // once we store the node where the event have been created/started/completed we
should rewrite it with this information.
+    void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException
{
+        TaskManager taskManager1 = taskManager();
+        Thread.sleep(100); // FIXME used to ensure that taskManager1 is the worker consuming
from rabbit
+        TaskManager taskManager2 = taskManager();
+
+        TaskId taskId = taskManager2.submit(new CompletedTask());
+
+        Awaitility.await()
+            .atMost(Duration.ONE_SECOND)
+            .pollInterval(100L, TimeUnit.MILLISECONDS)
+            .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message