james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 02/06: JAMES-2813 Implement TaskManagerContract for DistributedTaskManager
Date Tue, 03 Sep 2019 07:34:24 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5ed834b661015abb71da11d30fee679efd395f77
Author: RĂ©mi Kowalski <rkowalski@linagora.com>
AuthorDate: Thu Aug 22 12:04:54 2019 +0200

    JAMES-2813 Implement TaskManagerContract for DistributedTaskManager
---
 .../apache/james/DistributedTaskManagerModule.java | 12 +---
 .../distributed/RabbitMQWorkQueueSupplier.scala    | 38 +++++++++++++
 .../distributed/DistributedTaskManagerTest.java    | 66 +++++++++++++++-------
 3 files changed, 86 insertions(+), 30 deletions(-)

diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
index f5cb955..4d2b2ab 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
@@ -21,25 +21,17 @@
 package org.apache.james;
 
 import org.apache.james.modules.server.HostnameModule;
-import org.apache.james.task.MemoryWorkQueue;
-import org.apache.james.task.SerialTaskManagerWorker;
 import org.apache.james.task.TaskManager;
-import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
-import org.apache.james.task.eventsourcing.WorkerStatusListener;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 
 public class DistributedTaskManagerModule extends AbstractModule {
-    public static final WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
-        WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
-        TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
-        return new MemoryWorkQueue(worker);
-    };
 
     @Override
     protected void configure() {
@@ -49,6 +41,6 @@ public class DistributedTaskManagerModule extends AbstractModule {
         bind(WorkQueueSupplier.class).in(Scopes.SINGLETON);
         bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
         bind(TaskManager.class).to(EventSourcingTaskManager.class);
-        bind(WorkQueueSupplier.class).toInstance(workQueueSupplier);
+        bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class);
     }
 }
diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
new file mode 100644
index 0000000..fc5d7e2
--- /dev/null
+++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -0,0 +1,38 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing.distributed
+
+import javax.inject.Inject
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool
+import org.apache.james.eventsourcing.EventSourcingSystem
+import org.apache.james.server.task.json.JsonTaskSerializer
+import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener}
+import org.apache.james.task.{SerialTaskManagerWorker, WorkQueue}
+
+@Inject
+class RabbitMQWorkQueueSupplier(private val rabbitMQConnectionPool: SimpleConnectionPool,
+                                private val jsonTaskSerializer: JsonTaskSerializer) extends
WorkQueueSupplier {
+  override def apply(eventSourcingSystem: EventSourcingSystem): WorkQueue = {
+    val listener = WorkerStatusListener(eventSourcingSystem)
+    val worker = new SerialTaskManagerWorker(listener)
+    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
+    rabbitMQWorkQueue.start()
+    rabbitMQWorkQueue
+  }
+}
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 219fd06..7561181 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -22,6 +22,7 @@ package org.apache.james.task.eventsourcing.distributed;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
@@ -35,31 +36,42 @@ import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreEx
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
-import org.apache.james.task.SerialTaskManagerWorker;
+import org.apache.james.task.CountDownLatchExtension;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
-import org.apache.james.task.TaskManagerWorker;
+import org.apache.james.task.TaskManagerContract;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
-import org.apache.james.task.eventsourcing.WorkerStatusListener;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.steveash.guavate.Guavate;
 
-class DistributedTaskManagerTest {
-    private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
+@ExtendWith(CountDownLatchExtension.class)
+class DistributedTaskManagerTest implements TaskManagerContract {
 
+    private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(
+        TestTaskDTOModules.COMPLETED_TASK_MODULE,
+        TestTaskDTOModules.FAILED_TASK_MODULE,
+        TestTaskDTOModules.THROWING_TASK_MODULE,
+        TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
+
+    private static final Hostname HOSTNAME = new Hostname("foo");
+    private static final Hostname HOSTNAME_2 = new Hostname("bar");
     private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet());
 
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
@@ -75,22 +87,26 @@ class DistributedTaskManagerTest {
     @RegisterExtension
     static CassandraEventStoreExtension eventStoreExtension = new CassandraEventStoreExtension(cassandraCluster,
MODULES);
 
+    private final CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
+    private final CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO
= new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+    private final TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
+
+    WorkQueueSupplier workQueueSupplier = new RabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(),
TASK_SERIALIZER);
+    private EventStore eventStore;
+
+    @BeforeEach
+    void setUp(EventStore eventStore) {
+        this.eventStore = eventStore;
+    }
+
+    public TaskManager taskManager() {
+        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
HOSTNAME);
+    }
+
     @Test
-    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents(EventStore eventStore)
{
-        CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
-        CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO
= new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
-        TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
-
-        WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
-            WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
-            TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
-            RabbitMQWorkQueue rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitConnectionPool(),
TASK_SERIALIZER);
-            rabbitMQWorkQueue.start();
-            return rabbitMQWorkQueue;
-
-        };
-        TaskManager taskManager1 = new EventSourcingTaskManager(workQueueSupplier, eventStore,
executionDetailsProjection, new Hostname("foo"));
-        TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore,
executionDetailsProjection, new Hostname("bar"));
+    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
+        TaskManager taskManager1 = taskManager();
+        TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore,
executionDetailsProjection, HOSTNAME_2);
 
         TaskId taskId = taskManager1.submit(new CompletedTask());
         Awaitility.await()
@@ -102,4 +118,14 @@ class DistributedTaskManagerTest {
         TaskExecutionDetails detailsFromTaskManager2 = taskManager2.getExecutionDetails(taskId);
         assertThat(detailsFromTaskManager1).isEqualTo(detailsFromTaskManager2);
     }
+
+    @Test
+    @Disabled("Cancelling is not supported yet")
+    public void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
+    }
+
+    @Test
+    @Disabled("Cancelling is not supported yet")
+    public void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) {
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message