james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 01/03: JAMES-2813 Inject TerminationSubscriber into EventSourcingTaskManager
Date Tue, 03 Sep 2019 15:30:26 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5cafcf00056a659e371586990761bd6ba61494ba
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Tue Aug 27 11:01:11 2019 +0200

    JAMES-2813 Inject TerminationSubscriber into EventSourcingTaskManager
---
 .../apache/james/DistributedTaskManagerModule.java |   4 +
 .../distributed/DistributedTaskManagerTest.java    |   3 +-
 .../eventsourcing/EventSourcingTaskManager.scala   |   6 +-
 .../apache/james/task/eventsourcing/Events.scala   |   8 +-
 .../task/eventsourcing/TerminationSubscriber.scala |  46 +++++++
 .../EventSourcingTaskManagerTest.java              |   2 +-
 .../MemoryTerminationSubscriberTest.java           |  27 ++++
 .../TerminationSubscriberContract.java             | 144 +++++++++++++++++++++
 8 files changed, 233 insertions(+), 7 deletions(-)

diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
index 4d2b2ab..7dd21ef 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
@@ -23,7 +23,9 @@ package org.apache.james;
 import org.apache.james.modules.server.HostnameModule;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
+import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.TerminationSubscriber;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier;
@@ -40,6 +42,8 @@ public class DistributedTaskManagerModule extends AbstractModule {
         bind(TaskManager.class).in(Scopes.SINGLETON);
         bind(WorkQueueSupplier.class).in(Scopes.SINGLETON);
         bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
+        bind(TerminationSubscriber.class).in(Scopes.SINGLETON);
+        bind(TerminationSubscriber.class).toInstance(new MemoryTerminationSubscriber());
         bind(TaskManager.class).to(EventSourcingTaskManager.class);
         bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class);
     }
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index ca70292..ced6652 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -48,6 +48,7 @@ import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.Hostname;
+import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
@@ -107,7 +108,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     private EventSourcingTaskManager taskManager(Hostname hostname) {
-        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
hostname);
+        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
hostname, new MemoryTerminationSubscriber());
     }
 
     @Test
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index f57c947..93b8302 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -34,7 +34,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
                                                                                   workQueueSupplier:
WorkQueueSupplier,
                                                                                   val eventStore:
EventStore,
                                                                                   val executionDetailsProjection:
TaskExecutionDetailsProjection,
-                                                                                  val hostname:
Hostname) extends TaskManager with Closeable {
+                                                                                  val hostname:
Hostname,
+                                                                                  val terminationSubscriber:
TerminationSubscriber) extends TaskManager with Closeable {
 
   private val delayBetweenPollingInMs = 500
 
@@ -60,7 +61,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
       new FailCommandHandler(loadHistory)),
     subscribers = Set(
       executionDetailsProjection.asSubscriber(hostname),
-      workDispatcher),
+      workDispatcher,
+      terminationSubscriber),
     eventStore = eventStore)
 
   private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem)
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
index 6d9c7ba..68730bd 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
@@ -28,6 +28,8 @@ sealed abstract class TaskEvent(aggregateId: TaskAggregateId, val eventId:
Event
   override def getAggregateId: TaskAggregateId = aggregateId
 }
 
+sealed abstract class TerminalTaskEvent(aggregateId: TaskAggregateId, override val eventId:
EventId) extends TaskEvent(aggregateId, eventId)
+
 case class Hostname(private val value: String) {
   def asString: String = value
 }
@@ -50,8 +52,8 @@ case class Started(aggregateId: TaskAggregateId, override val eventId: EventId,
 
 case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId, hostname:
Hostname) extends TaskEvent(aggregateId, eventId)
 
-case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result:
Result) extends TaskEvent(aggregateId, eventId)
+case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result:
Result) extends TerminalTaskEvent(aggregateId, eventId)
 
-case class Failed(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId,
eventId)
+case class Failed(aggregateId: TaskAggregateId, override val eventId: EventId) extends TerminalTaskEvent(aggregateId,
eventId)
 
-case class Cancelled(aggregateId: TaskAggregateId, override val eventId: EventId) extends
TaskEvent(aggregateId, eventId)
+case class Cancelled(aggregateId: TaskAggregateId, override val eventId: EventId) extends
TerminalTaskEvent(aggregateId, eventId)
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
new file mode 100644
index 0000000..af23af4
--- /dev/null
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
@@ -0,0 +1,46 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.{Event, Subscriber}
+import org.reactivestreams.Publisher
+
+import reactor.core.publisher.DirectProcessor
+
+trait TerminationSubscriber extends Subscriber {
+  override def handle(event: Event): Unit = event match {
+    case event: TerminalTaskEvent => addEvent(event)
+    case _ =>
+  }
+
+  def addEvent(event: Event): Unit
+
+  def listenEvents: Publisher[Event]
+}
+
+class MemoryTerminationSubscriber extends TerminationSubscriber {
+  private val events = DirectProcessor.create[Event]()
+
+  override def addEvent(event: Event): Unit =
+    events.onNext(event)
+
+  override def listenEvents: Publisher[Event] =
+    events.share()
+}
\ No newline at end of file
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index 281e217..cb5f19a 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -61,7 +61,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
             TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
             return new MemoryWorkQueue(worker);
         };
-        taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
HOSTNAME);
+        taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
HOSTNAME, new MemoryTerminationSubscriber());
     }
 
     @AfterEach
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/MemoryTerminationSubscriberTest.java
b/server/task/src/test/java/org/apache/james/task/eventsourcing/MemoryTerminationSubscriberTest.java
new file mode 100644
index 0000000..b8d2707
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/MemoryTerminationSubscriberTest.java
@@ -0,0 +1,27 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.task.eventsourcing;
+
+class MemoryTerminationSubscriberTest implements TerminationSubscriberContract {
+    public TerminationSubscriber subscriber() {
+        return new MemoryTerminationSubscriber();
+    }
+}
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
b/server/task/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
new file mode 100644
index 0000000..5b0f5ff
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -0,0 +1,144 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.task.eventsourcing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
+
+import org.assertj.core.api.ListAssert;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public interface TerminationSubscriberContract {
+
+    Completed COMPLETED_EVENT = new Completed(new TaskAggregateId(TaskId.generateTaskId()),
EventId.fromSerialized(42), Task.Result.COMPLETED);
+    Failed FAILED_EVENT = new Failed(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42));
+    Cancelled CANCELLED_EVENT = new Cancelled(new TaskAggregateId(TaskId.generateTaskId()),
EventId.fromSerialized(42));
+    Duration DELAY_BETWEEN_EVENTS = Duration.ofMillis(50);
+    Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50);
+
+    TerminationSubscriber subscriber();
+
+    @Test
+    default void handlingCompletedShouldBeListed() {
+        TerminationSubscriber subscriber = subscriber();
+
+        sendEvents(subscriber, COMPLETED_EVENT);
+
+        assertEvents(subscriber).containsOnly(COMPLETED_EVENT);
+    }
+
+    @Test
+    default void handlingFailedShouldBeListed() {
+        TerminationSubscriber subscriber = subscriber();
+
+        sendEvents(subscriber, FAILED_EVENT);
+
+        assertEvents(subscriber).containsOnly(FAILED_EVENT);
+    }
+
+    @Test
+    default void handlingCancelledShouldBeListed() {
+        TerminationSubscriber subscriber = subscriber();
+
+        sendEvents(subscriber, CANCELLED_EVENT);
+
+        assertEvents(subscriber).containsOnly(CANCELLED_EVENT);
+    }
+
+    @Test
+    default void handlingNonTerminalEventShouldNotBeListed() {
+        TerminationSubscriber subscriber = subscriber();
+        TaskEvent event = new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42),
new Hostname("foo"));
+
+        sendEvents(subscriber, event);
+
+        assertEvents(subscriber).isEmpty();
+    }
+
+    @Test
+    default void handlingMultipleEventsShouldBeListed() {
+        TerminationSubscriber subscriber = subscriber();
+
+        sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
+
+        assertEvents(subscriber).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
+    }
+
+    @Test
+    default void multipleListeningEventsShouldShareEvents() {
+        TerminationSubscriber subscriber = subscriber();
+
+        sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
+
+        List<List<Event>> listenedEvents = Flux.range(0, 2)
+            .subscribeOn(Schedulers.elastic())
+            .flatMap(ignored -> collectEvents(subscriber))
+            .collectList()
+            .block();
+        assertThat(listenedEvents).hasSize(2);
+        assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT, FAILED_EVENT,
CANCELLED_EVENT);
+        assertThat(listenedEvents.get(1)).isEqualTo(listenedEvents.get(0));
+    }
+
+    @Test
+    default void dynamicListeningEventsShouldGetOnlyNewEvents() {
+        TerminationSubscriber subscriber = subscriber();
+
+        sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
+
+        List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2)))
+            .then(Mono.defer(() -> collectEvents(subscriber)))
+            .subscribeOn(Schedulers.elastic())
+            .block();
+        assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT);
+    }
+
+    default ListAssert<Event> assertEvents(TerminationSubscriber subscriber) {
+        return assertThat(collectEvents(subscriber)
+            .block());
+    }
+
+    default Mono<List<Event>> collectEvents(TerminationSubscriber subscriber)
{
+        return Flux.from(subscriber.listenEvents())
+            .subscribeOn(Schedulers.elastic())
+            .take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7)))
+            .collectList();
+    }
+
+    default void sendEvents(TerminationSubscriber subscriber, Event... events) {
+        Mono.delay(DELAY_BEFORE_PUBLISHING)
+            .flatMapMany(ignored -> Flux.fromArray(events)
+                .subscribeOn(Schedulers.elastic())
+                .delayElements(DELAY_BETWEEN_EVENTS)
+                .doOnNext(subscriber::handle))
+            .subscribe();
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message