james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rouaz...@apache.org
Subject [james-project] 02/03: JAMES-2813 Add RabbitMQTerminationSubscriber
Date Tue, 03 Sep 2019 15:30:27 GMT
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5ff63262face2880e50cb1c775fcc9b1c13fc320
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Tue Aug 27 16:10:42 2019 +0200

    JAMES-2813 Add RabbitMQTerminationSubscriber
---
 .../apache/james/DistributedTaskManagerModule.java |   4 +-
 .../distributed/RabbitMQTerminationSubscriber.java | 146 +++++++++++++++++++++
 .../distributed/DistributedTaskManagerTest.java    |  15 ++-
 .../RabbitMQTerminationSubscriberTest.java         |  73 +++++++++++
 4 files changed, 230 insertions(+), 8 deletions(-)

diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
index 7dd21ef..76197db 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java
@@ -23,11 +23,11 @@ package org.apache.james;
 import org.apache.james.modules.server.HostnameModule;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
-import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.TerminationSubscriber;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber;
 import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier;
 
 import com.google.inject.AbstractModule;
@@ -43,7 +43,7 @@ public class DistributedTaskManagerModule extends AbstractModule {
         bind(WorkQueueSupplier.class).in(Scopes.SINGLETON);
         bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
         bind(TerminationSubscriber.class).in(Scopes.SINGLETON);
-        bind(TerminationSubscriber.class).toInstance(new MemoryTerminationSubscriber());
+        bind(TerminationSubscriber.class).to(RabbitMQTerminationSubscriber.class);
         bind(TaskManager.class).to(EventSourcingTaskManager.class);
         bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class);
     }
diff --git a/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
new file mode 100644
index 0000000..a3a8bb1
--- /dev/null
+++ b/server/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -0,0 +1,146 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.task.eventsourcing.distributed;
+
+import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.UUID;
+
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
+import org.apache.james.backend.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
+import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.task.eventsourcing.TerminationSubscriber;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+import reactor.core.Disposable;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.UnicastProcessor;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable
{
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
+    private static final Integer MAX_CHANNELS_NUMBER = 1;
+    private static final String EXCHANGE_NAME = "terminationSubscriberExchange";
+    private static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
+    private static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
+
+    private final JsonEventSerializer serializer;
+    private final Mono<Connection> connectionMono;
+    private final ReactorRabbitMQChannelPool channelPool;
+    private final String queueName;
+    private UnicastProcessor<OutboundMessage> sendQueue;
+    private DirectProcessor<Event> listener;
+    private Disposable sendQueueHandle;
+    private Disposable listenQueueHandle;
+
+    @Inject
+    public RabbitMQTerminationSubscriber(SimpleConnectionPool simpleConnectionPool, JsonEventSerializer
serializer) {
+        this.serializer = serializer;
+        this.connectionMono = simpleConnectionPool.getResilientConnection();
+        this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
+        this.queueName = QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
+    }
+
+    public void start() {
+        Sender sender = RabbitFlux.createSender(new SenderOptions()
+            .connectionMono(connectionMono)
+            .channelPool(channelPool)
+            .resourceManagementChannelMono(
+                connectionMono.map(Throwing.function(Connection::createChannel)).cache()));
+
+        sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
+        sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
+        sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName)).block();
+        sendQueue = UnicastProcessor.create();
+        sendQueueHandle = sender
+            .send(sendQueue)
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+
+        Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+        listener = DirectProcessor.create();
+        listenQueueHandle = receiver
+            .consumeAutoAck(queueName)
+            .subscribeOn(Schedulers.elastic())
+            .concatMap(this::toEvent)
+            .subscribe(listener::onNext);
+    }
+
+    @Override
+    public void addEvent(Event event) {
+        try {
+            byte[] payload = serializer.serialize(event).getBytes(StandardCharsets.UTF_8);
+            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build();
+            OutboundMessage message = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties,
payload);
+            sendQueue.onNext(message);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Publisher<Event> listenEvents() {
+        return listener
+            .share();
+    }
+
+    private Mono<Event> toEvent(Delivery delivery) {
+        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
+        try {
+            Event event = serializer.deserialize(message);
+            return Mono.just(event);
+        } catch (Exception e) {
+            LOGGER.error("Unable to deserialize '{}'", message, e);
+            return Mono.empty();
+        }
+    }
+
+    @Override
+    @PreDestroy
+    public void close() {
+        Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose);
+        Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
+        channelPool.close();
+    }
+}
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index ced6652..5db982d 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
+import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
@@ -48,7 +49,6 @@ import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.Hostname;
-import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
@@ -76,6 +76,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     private static final Hostname HOSTNAME = new Hostname("foo");
     private static final Hostname HOSTNAME_2 = new Hostname("bar");
     private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet());
+    private static final JsonEventSerializer EVENT_SERIALIZER = new JsonEventSerializer(MODULES);
 
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
         CassandraModule.aggregateModules(
@@ -104,17 +105,19 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     public EventSourcingTaskManager taskManager() {
-        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
HOSTNAME);
+        return taskManager(HOSTNAME);
     }
 
     private EventSourcingTaskManager taskManager(Hostname hostname) {
-        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
hostname, new MemoryTerminationSubscriber());
+        RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(),
EVENT_SERIALIZER);
+        terminationSubscriber.start();
+        return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection,
hostname, terminationSubscriber);
     }
 
     @Test
     void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
         try (EventSourcingTaskManager taskManager1 = taskManager();
-             EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier,
eventStore, executionDetailsProjection, HOSTNAME_2)) {
+             EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
             TaskId taskId = taskManager1.submit(new CompletedTask());
             Awaitility.await()
                 .atMost(Duration.FIVE_SECONDS)
@@ -142,7 +145,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
 
         try (EventSourcingTaskManager taskManager1 = taskManager();
-             EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier,
eventStore, executionDetailsProjection, HOSTNAME_2)) {
+             EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
 
             taskManager1.submit(new MemoryReferenceTask(() -> {
                 waitingForFirstTaskLatch.await();
@@ -164,7 +167,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException
{
         try (EventSourcingTaskManager taskManager1 = taskManager()) {
             Thread.sleep(100);
-            try (EventSourcingTaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier,
eventStore, executionDetailsProjection, HOSTNAME_2)) {
+            try (EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
 
                 TaskId taskId = taskManager2.submit(new CompletedTask());
 
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
new file mode 100644
index 0000000..fe8690d
--- /dev/null
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -0,0 +1,73 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.task.eventsourcing.distributed;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
+import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
+import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.task.eventsourcing.TerminationSubscriber;
+import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
+
+import com.github.steveash.guavate.Guavate;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
+    private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer();
+    private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet());
+    private static final JsonEventSerializer SERIALIZER = new JsonEventSerializer(MODULES);
+
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
+
+    @Override
+    public TerminationSubscriber subscriber() {
+        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitConnectionPool(),
SERIALIZER);
+        subscriber.start();
+        return subscriber;
+    }
+
+    @Test
+    void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() {
+        TerminationSubscriber subscriber1 = subscriber();
+        TerminationSubscriber subscriber2 = subscriber();
+
+        sendEvents(subscriber1, COMPLETED_EVENT);
+
+        List<List<Event>> listenedEvents = Flux.just(subscriber1, subscriber2)
+            .subscribeOn(Schedulers.elastic())
+            .flatMap(this::collectEvents)
+            .collectList()
+            .block();
+        assertThat(listenedEvents).hasSize(2);
+        assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT);
+        assertThat(listenedEvents.get(1)).containsExactly(COMPLETED_EVENT);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message