james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 05/06: JAMES-2873 Add test to ensure DistributedTaskManager track nodes correctly
Date Tue, 03 Sep 2019 07:34:27 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1c8295a687ca95169ab67916f83503018498cad6
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Mon Aug 26 11:32:34 2019 +0200

    JAMES-2873 Add test to ensure DistributedTaskManager track nodes correctly
---
 .../distributed/DistributedTaskManagerTest.java    | 87 ++++++++++++----------
 1 file changed, 48 insertions(+), 39 deletions(-)

diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 994edca..ca70292 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -102,24 +102,28 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         this.eventStore = eventStore;
     }
 
-    public TaskManager taskManager() {
+    public EventSourcingTaskManager taskManager() {
         return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
HOSTNAME);
     }
 
+    private EventSourcingTaskManager taskManager(Hostname hostname) {
+        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
hostname);
+    }
+
     @Test
     void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
-        TaskManager taskManager1 = taskManager();
-        TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore,
executionDetailsProjection, HOSTNAME_2);
-
-        TaskId taskId = taskManager1.submit(new CompletedTask());
-        Awaitility.await()
-            .atMost(Duration.FIVE_SECONDS)
-            .pollInterval(100L, TimeUnit.MILLISECONDS)
-            .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED);
-
-        TaskExecutionDetails detailsFromTaskManager1 = taskManager1.getExecutionDetails(taskId);
-        TaskExecutionDetails detailsFromTaskManager2 = taskManager2.getExecutionDetails(taskId);
-        assertThat(detailsFromTaskManager1).isEqualTo(detailsFromTaskManager2);
+        try (EventSourcingTaskManager taskManager1 = taskManager();
+             EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier,
eventStore, executionDetailsProjection, HOSTNAME_2)) {
+            TaskId taskId = taskManager1.submit(new CompletedTask());
+            Awaitility.await()
+                .atMost(Duration.FIVE_SECONDS)
+                .pollInterval(100L, TimeUnit.MILLISECONDS)
+                .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED);
+
+            TaskExecutionDetails detailsFromTaskManager1 = taskManager1.getExecutionDetails(taskId);
+            TaskExecutionDetails detailsFromTaskManager2 = taskManager2.getExecutionDetails(taskId);
+            assertThat(detailsFromTaskManager1).isEqualTo(detailsFromTaskManager2);
+        }
     }
 
     @Test
@@ -133,40 +137,45 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagersAndTwoTasksOnlyOneTaskShouldRunAtTheSameTime() throws InterruptedException
{
+    void givenTwoTaskManagersAndTwoTasksOnlyOneTaskShouldRunAtTheSameTime() {
         CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
 
-        TaskManager taskManager1 = taskManager();
-        TaskManager taskManager2 = taskManager();
+        try (EventSourcingTaskManager taskManager1 = taskManager();
+             EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier,
eventStore, executionDetailsProjection, HOSTNAME_2)) {
 
-        taskManager1.submit(new MemoryReferenceTask(() -> {
-            waitingForFirstTaskLatch.await();
-            return Task.Result.COMPLETED;
-        }));
-        TaskId waitingTaskId = taskManager1.submit(new CompletedTask());
+            taskManager1.submit(new MemoryReferenceTask(() -> {
+                waitingForFirstTaskLatch.await();
+                return Task.Result.COMPLETED;
+            }));
+            TaskId waitingTaskId = taskManager1.submit(new CompletedTask());
 
-        awaitUntilTaskHasStatus(waitingTaskId, TaskManager.Status.WAITING, taskManager2);
-        waitingForFirstTaskLatch.countDown();
+            awaitUntilTaskHasStatus(waitingTaskId, TaskManager.Status.WAITING, taskManager2);
+            waitingForFirstTaskLatch.countDown();
 
-        Awaitility.await()
-            .atMost(Duration.ONE_SECOND)
-            .pollInterval(100L, TimeUnit.MILLISECONDS)
-            .until(() -> taskManager1.await(waitingTaskId).getStatus() == TaskManager.Status.COMPLETED);
+            Awaitility.await()
+                .atMost(Duration.ONE_SECOND)
+                .pollInterval(100L, TimeUnit.MILLISECONDS)
+                .until(() -> taskManager1.await(waitingTaskId).getStatus() == TaskManager.Status.COMPLETED);
+        }
     }
 
     @Test
-        // FIXME it's currently dependent of the implementation of the sequential TaskManager
with the exclusive RabbitMQ consumer
-        // once we store the node where the event have been created/started/completed we
should rewrite it with this information.
     void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException
{
-        TaskManager taskManager1 = taskManager();
-        Thread.sleep(100); // FIXME used to ensure that taskManager1 is the worker consuming
from rabbit
-        TaskManager taskManager2 = taskManager();
-
-        TaskId taskId = taskManager2.submit(new CompletedTask());
-
-        Awaitility.await()
-            .atMost(Duration.ONE_SECOND)
-            .pollInterval(100L, TimeUnit.MILLISECONDS)
-            .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED);
+        try (EventSourcingTaskManager taskManager1 = taskManager()) {
+            Thread.sleep(100);
+            try (EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier,
eventStore, executionDetailsProjection, HOSTNAME_2)) {
+
+                TaskId taskId = taskManager2.submit(new CompletedTask());
+
+                Awaitility.await()
+                    .atMost(Duration.ONE_SECOND)
+                    .pollInterval(100L, TimeUnit.MILLISECONDS)
+                    .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED);
+
+                TaskExecutionDetails executionDetails = taskManager2.getExecutionDetails(taskId);
+                assertThat(executionDetails.getSubmittedNode()).isEqualTo(HOSTNAME_2);
+                assertThat(executionDetails.getRanNode()).contains(HOSTNAME);
+            }
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message