james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adup...@apache.org
Subject [4/7] james-project git commit: JAMES-2545 Move RabbitMQ tests and utils to backend package
Date Wed, 12 Sep 2018 08:03:26 GMT
JAMES-2545 Move RabbitMQ tests and utils to backend package


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/cc72f881
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/cc72f881
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/cc72f881

Branch: refs/heads/master
Commit: cc72f8810ffcf4af6de7ee6260ad853389e490ef
Parents: bf95a14
Author: Benoit Tellier <btellier@linagora.com>
Authored: Mon Sep 10 11:12:51 2018 +0700
Committer: Antoine Duprat <aduprat@linagora.com>
Committed: Wed Sep 12 10:01:12 2018 +0200

----------------------------------------------------------------------
 backends-common/rabbitmq/pom.xml                |  32 ++
 .../DockerClusterRabbitMQExtension.java         | 130 ++++++++
 .../james/backend/mailqueue/DockerRabbitMQ.java | 167 ++++++++++
 .../mailqueue/DockerRabbitMQExtension.java      |  53 +++
 .../mailqueue/DockerRabbitMQExtensionTest.java  |  50 +++
 .../backend/mailqueue/InMemoryConsumer.java     |  61 ++++
 .../backend/mailqueue/RabbitMQClusterTest.java  | 294 +++++++++++++++++
 .../backend/mailqueue/RabbitMQFixture.java      |  50 +++
 .../james/backend/mailqueue/RabbitMQTest.java   | 330 +++++++++++++++++++
 .../backend/mailqueue/RabbitMQWaitStrategy.java |  67 ++++
 .../ReusableDockerRabbitMQExtension.java        |  59 ++++
 .../src/test/resources/logback-test.xml         |  17 +
 pom.xml                                         |   6 +
 server/queue/queue-rabbitmq/pom.xml             |  10 +
 .../DockerClusterRabbitMQExtension.java         | 130 --------
 .../james/queue/rabbitmq/DockerRabbitMQ.java    | 167 ----------
 .../queue/rabbitmq/DockerRabbitMQExtension.java |  53 ---
 .../rabbitmq/DockerRabbitMQExtensionTest.java   |  50 ---
 .../james/queue/rabbitmq/InMemoryConsumer.java  |  61 ----
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 294 -----------------
 .../james/queue/rabbitmq/RabbitMQFixture.java   |  50 ---
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   |   2 +
 .../james/queue/rabbitmq/RabbitMQTest.java      | 330 -------------------
 .../queue/rabbitmq/RabbitMQWaitStrategy.java    |  67 ----
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  |   2 +
 .../ReusableDockerRabbitMQExtension.java        |  59 ----
 .../src/test/resources/logback-test.xml         |   2 +-
 27 files changed, 1331 insertions(+), 1262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index ee842f2..d518753 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -32,14 +32,41 @@
 
     <dependencies>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-util</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.jayway.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-configuration</groupId>
             <artifactId>commons-configuration</artifactId>
         </dependency>
         <dependency>
+            <groupId>javax.inject</groupId>
+            <artifactId>javax.inject</artifactId>
+        </dependency>
+        <dependency>
             <groupId>nl.jqno.equalsverifier</groupId>
             <artifactId>equalsverifier</artifactId>
             <scope>test</scope>
@@ -64,6 +91,11 @@
             <artifactId>jcl-over-slf4j</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java
new file mode 100644
index 0000000..dd382d0
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerClusterRabbitMQExtension.java
@@ -0,0 +1,130 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.util.Runnables;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.containers.Network;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+import com.rabbitmq.client.Address;
+
+public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+    public static final String RABBIT_1 = "rabbit1";
+    public static final String RABBIT_2 = "rabbit2";
+    public static final String RABBIT_3 = "rabbit3";
+    private DockerRabbitMQCluster cluster;
+    private Network network;
+
+    @Override
+    public void beforeEach(ExtensionContext context) {
+        String cookie = Hashing.sha256().hashString("secret cookie here", StandardCharsets.UTF_8).toString();
+
+        network = Network.NetworkImpl.builder()
+            .enableIpv6(false)
+            .createNetworkCmdModifiers(ImmutableList.of())
+            .build();
+
+        DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
+        DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
+        DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
+
+        Runnables.runParallel(
+            rabbitMQ1::start,
+            rabbitMQ2::start,
+            rabbitMQ3::start);
+
+        Runnables.runParallel(
+            Throwing.runnable(() -> rabbitMQ2.join(rabbitMQ1)),
+            Throwing.runnable(() -> rabbitMQ3.join(rabbitMQ1)));
+
+
+
+        Runnables.runParallel(
+            Throwing.runnable(rabbitMQ2::startApp),
+            Throwing.runnable(rabbitMQ3::startApp));
+
+        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        cluster.stop();
+        network.close();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return cluster;
+    }
+
+    public static class DockerRabbitMQCluster {
+
+        private final DockerRabbitMQ rabbitMQ1;
+        private final DockerRabbitMQ rabbitMQ2;
+        private final DockerRabbitMQ rabbitMQ3;
+
+        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
+            this.rabbitMQ1 = rabbitMQ1;
+            this.rabbitMQ2 = rabbitMQ2;
+            this.rabbitMQ3 = rabbitMQ3;
+        }
+
+        public void stop() {
+            Runnables.runParallel(
+                Throwing.runnable(rabbitMQ1::stop).orDoNothing(),
+                Throwing.runnable(rabbitMQ2::stop).orDoNothing(),
+                Throwing.runnable(rabbitMQ3::stop).orDoNothing());
+        }
+
+        public DockerRabbitMQ getRabbitMQ1() {
+            return rabbitMQ1;
+        }
+
+        public DockerRabbitMQ getRabbitMQ2() {
+            return rabbitMQ2;
+        }
+
+        public DockerRabbitMQ getRabbitMQ3() {
+            return rabbitMQ3;
+        }
+
+        public ImmutableList<Address> getAddresses() {
+            return ImmutableList.of(
+                new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
+                new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
+                new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java
new file mode 100644
index 0000000..6c817cf
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQ.java
@@ -0,0 +1,167 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.james.util.docker.Images;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class DockerRabbitMQ {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
+
+    private static final String DEFAULT_RABBIT_NODE = "my-rabbit";
+    private static final int DEFAULT_RABBITMQ_PORT = 5672;
+    private static final int DEFAULT_RABBITMQ_ADMIN_PORT = 15672;
+    private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
+    private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
+    private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE";
+    private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME";
+
+    private final GenericContainer<?> container;
+    private final Optional<String> nodeName;
+
+    public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) {
+        return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName),
+            Optional.of(network));
+    }
+
+    public static DockerRabbitMQ withoutCookie() {
+        return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+    }
+
+    @SuppressWarnings("resource")
+    private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
+        container = new GenericContainer<>(Images.RABBITMQ)
+                .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse(randomName())))
+                .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
+                .withExposedPorts(DEFAULT_RABBITMQ_PORT, DEFAULT_RABBITMQ_ADMIN_PORT)
+                .waitingFor(new WaitAllStrategy()
+                    .withStrategy(Wait.forHttp("").forPort(DEFAULT_RABBITMQ_ADMIN_PORT))
+                    .withStrategy(RabbitMQWaitStrategy.withDefaultTimeout(this)))
+                .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()))
+                .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig()
+                    .withTmpFs(ImmutableMap.of("/var/lib/rabbitmq/mnesia", "rw,noexec,nosuid,size=100m")));
+        net.ifPresent(container::withNetwork);
+        erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie));
+        nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name));
+        this.nodeName = nodeName;
+    }
+
+    private String randomName() {
+        return UUID.randomUUID().toString();
+    }
+
+    public String getHostIp() {
+        return container.getContainerIpAddress();
+    }
+
+    public Integer getPort() {
+        return container.getMappedPort(DEFAULT_RABBITMQ_PORT);
+    }
+
+    public Integer getAdminPort() {
+        return container.getMappedPort(DEFAULT_RABBITMQ_ADMIN_PORT);
+    }
+
+    public String getUsername() {
+        return DEFAULT_RABBITMQ_USERNAME;
+    }
+
+    public String getPassword() {
+        return DEFAULT_RABBITMQ_PASSWORD;
+    }
+
+    public ConnectionFactory connectionFactory() {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost(getHostIp());
+        connectionFactory.setPort(getPort());
+        connectionFactory.setUsername(getUsername());
+        connectionFactory.setPassword(getPassword());
+        return connectionFactory;
+    }
+
+    public void start() {
+        container.start();
+    }
+
+    public void stop() {
+        container.stop();
+    }
+
+    public void restart() {
+        DockerClientFactory.instance().client()
+            .restartContainerCmd(container.getContainerId());
+    }
+
+    public GenericContainer<?> container() {
+        return container;
+    }
+
+    public String node() {
+        return nodeName.get();
+    }
+
+    public void join(DockerRabbitMQ rabbitMQ) throws Exception {
+        stopApp();
+        joinCluster(rabbitMQ);
+    }
+
+    private void stopApp() throws java.io.IOException, InterruptedException {
+        String stdout = container()
+            .execInContainer("rabbitmqctl", "stop_app")
+            .getStdout();
+        LOGGER.debug("stop_app: {}", stdout);
+    }
+
+    private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException {
+        String stdout = container()
+            .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
+            .getStdout();
+        LOGGER.debug("join_cluster: {}", stdout);
+    }
+
+    public void startApp() throws Exception {
+        String stdout = container()
+                .execInContainer("rabbitmqctl", "start_app")
+                .getStdout();
+        LOGGER.debug("start_app: {}", stdout);
+    }
+
+    public void reset() throws Exception {
+        stopApp();
+
+        String stdout = container()
+            .execInContainer("rabbitmqctl", "reset")
+            .getStdout();
+        LOGGER.debug("reset: {}", stdout);
+
+        startApp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java
new file mode 100644
index 0000000..ea11f81
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtension.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+    private DockerRabbitMQ rabbitMQ;
+
+    @Override
+    public void beforeEach(ExtensionContext context) {
+        rabbitMQ = DockerRabbitMQ.withoutCookie();
+        rabbitMQ.start();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        rabbitMQ.stop();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == DockerRabbitMQ.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return rabbitMQ;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java
new file mode 100644
index 0000000..aaabc5b
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/DockerRabbitMQExtensionTest.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerRabbitMQExtension.class)
+public class DockerRabbitMQExtensionTest {
+
+    private ConnectionFactory connectionFactory;
+
+    @BeforeEach
+    public void setup(DockerRabbitMQ rabbitMQ) {
+        connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost(rabbitMQ.getHostIp());
+        connectionFactory.setPort(rabbitMQ.getPort());
+        connectionFactory.setUsername(rabbitMQ.getUsername());
+        connectionFactory.setPassword(rabbitMQ.getPassword());
+    }
+
+    @Test
+    public void containerShouldBeUp() throws Exception {
+        try (Connection connection = connectionFactory.newConnection()) {
+            assertThat(connection.isOpen()).isTrue();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java
new file mode 100644
index 0000000..e6fe021
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/InMemoryConsumer.java
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+
+public class InMemoryConsumer extends DefaultConsumer {
+
+    @FunctionalInterface
+    interface Operation {
+        void perform();
+    }
+
+    private final ConcurrentLinkedQueue<Integer> messages;
+    private final Operation operation;
+
+    public InMemoryConsumer(Channel channel) {
+        this(channel, () -> { });
+    }
+
+    public InMemoryConsumer(Channel channel, Operation operation) {
+        super(channel);
+        this.operation = operation;
+        this.messages = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+        operation.perform();
+        Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
+        messages.add(payload);
+    }
+
+    public Queue<Integer> getConsumedMessages() {
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java
new file mode 100644
index 0000000..5467840
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQClusterTest.java
@@ -0,0 +1,294 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_DELETE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.DIRECT;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.DURABLE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCLUSIVE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.james.backend.mailqueue.DockerClusterRabbitMQExtension.DockerRabbitMQCluster;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.Address;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerClusterRabbitMQExtension.class)
+class RabbitMQClusterTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);
+
+    private static final String QUEUE = "queue";
+
+    @Nested
+    class ClusterSharing {
+
+        private ConnectionFactory node1ConnectionFactory;
+        private ConnectionFactory node2ConnectionFactory;
+        private Connection node1Connection;
+        private Connection node2Connection;
+        private Channel node1Channel;
+        private Channel node2Channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+            node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+            node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+            node1Connection = node1ConnectionFactory.newConnection();
+            node2Connection = node2ConnectionFactory.newConnection();
+            node1Channel = node1Connection.createChannel();
+            node2Channel = node2Connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+        }
+
+        @Test
+        void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
+            String stdout = cluster.getRabbitMQ1().container()
+                .execInContainer("rabbitmqctl", "cluster_status")
+                .getStdout();
+
+            assertThat(stdout)
+                .contains(
+                    DockerClusterRabbitMQExtension.RABBIT_1,
+                    DockerClusterRabbitMQExtension.RABBIT_2,
+                    DockerClusterRabbitMQExtension.RABBIT_3);
+        }
+
+        @Test
+        void queuesShouldBeShared() throws Exception {
+            node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        @Test
+        void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
+            node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+    }
+
+    @Nested
+    class ClusterNodesFailure {
+
+        private ConnectionFactory node1ConnectionFactory;
+        private Connection resilientConnection;
+        private Channel resilientChannel;
+        private Connection node2Connection;
+        private Channel node2Channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+            node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+            resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses());
+            resilientChannel = resilientConnection.createChannel();
+            ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+            node2Connection = node2ConnectionFactory.newConnection();
+            node2Channel = node2Connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(resilientConnection, resilientChannel);
+        }
+
+        @Disabled("JAMES-2334 For some reason, we are unable to recover topology when reconnecting" +
+            "See https://github.com/rabbitmq/rabbitmq-server/issues/959")
+        @Test
+        void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception {
+            resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 20;
+            int firstBatchSize = nbMessages / 2;
+            IntStream.range(0, firstBatchSize)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer);
+            awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize);
+
+            cluster.getRabbitMQ1().stop();
+
+            IntStream.range(firstBatchSize, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(this::tryPublishWithRetry);
+
+            awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        private void tryPublishWithRetry(byte[] bytes) {
+            Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes));
+        }
+
+        private boolean tryPublish(byte[] bytes) {
+            try {
+                resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes);
+                return true;
+            } catch (Exception e) {
+                LOGGER.error("failed publish", e);
+                return false;
+            }
+        }
+
+        @Test
+        void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception {
+            ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory();
+            ImmutableList<Address> clusterAddresses = cluster.getAddresses();
+            cluster.getRabbitMQ3().stop();
+
+            try (Connection connection = node3ConnectionFactory.newConnection(clusterAddresses);
+                 Channel channel = connection.createChannel()) {
+
+                channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+                channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+                channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                int nbMessages = 10;
+                IntStream.range(0, nbMessages)
+                    .mapToObj(i -> asBytes(String.valueOf(i)))
+                    .forEach(Throwing.consumer(
+                        bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                InMemoryConsumer consumer = new InMemoryConsumer(channel);
+                channel.basicConsume(QUEUE, consumer);
+
+                awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+                List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+                assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+            }
+        }
+
+        @Test
+        void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception {
+            node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            AtomicInteger counter = new AtomicInteger(0);
+            InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
+                () -> stopWhenHalfProcessed(cluster, nbMessages, counter));
+            resilientChannel.basicConsume(QUEUE, consumer);
+
+            awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        private void stopWhenHalfProcessed(DockerRabbitMQCluster cluster, int nbMessages, AtomicInteger counter) {
+            if (counter.incrementAndGet() == nbMessages / 2) {
+                cluster.getRabbitMQ1().stop();
+            }
+        }
+
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
+    }
+
+    private byte[] asBytes(String message) {
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java
new file mode 100644
index 0000000..8b83a96
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQFixture.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.mailqueue;
+
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_MINUTE;
+
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
+
+import com.rabbitmq.client.AMQP;
+
+public class RabbitMQFixture {
+    public static final boolean DURABLE = true;
+    public static final boolean AUTO_ACK = true;
+    public static final AMQP.BasicProperties NO_PROPERTIES = null;
+    public static final String EXCHANGE_NAME = "exchangeName";
+    public static final String ROUTING_KEY = "routingKey";
+    public static final String DIRECT = "direct";
+    public static final boolean EXCLUSIVE = true;
+    public static final boolean AUTO_DELETE = true;
+    public static final String WORK_QUEUE = "workQueue";
+
+    public static Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+    public static ConditionFactory calmlyAwait = Awaitility.with()
+        .pollInterval(slowPacedPollInterval)
+        .and()
+        .with()
+        .pollDelay(slowPacedPollInterval)
+        .await();
+    public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java
new file mode 100644
index 0000000..f54ba25
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQTest.java
@@ -0,0 +1,330 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_ACK;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.AUTO_DELETE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.DIRECT;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.DURABLE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.EXCLUSIVE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.WORK_QUEUE;
+import static org.apache.james.backend.mailqueue.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerRabbitMQExtension.class)
+class RabbitMQTest {
+
+    @Nested
+    class SingleConsumerTest {
+
+        private ConnectionFactory connectionFactory;
+        private Connection connection;
+        private Channel channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
+            connectionFactory = rabbitMQ.connectionFactory();
+            connection = connectionFactory.newConnection();
+            channel = connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(connection, channel);
+        }
+
+        @Test
+        void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception {
+            String queueName = createQueue(channel);
+            publishAMessage(channel);
+            awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+        }
+
+        @Test
+        void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception {
+            String queueName = createQueue(channel);
+            publishAMessage(channel);
+
+            rabbitMQ.restart();
+
+            awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ));
+            assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull();
+        }
+
+        private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) {
+            try {
+                rabbitMQ.connectionFactory().newConnection();
+                return true;
+            } catch (Exception e) {
+                return false;
+            }
+        }
+
+        private String createQueue(Channel channel) throws IOException {
+            channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            String queueName = channel.queueDeclare().getQueue();
+            channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
+            return queueName;
+        }
+
+        private void publishAMessage(Channel channel) throws IOException {
+            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!"));
+        }
+
+        private Boolean messageReceived(Channel channel, String queueName) {
+            try {
+                return channel.basicGet(queueName, !AUTO_ACK) != null;
+            } catch (Exception e) {
+                return false;
+            }
+        }
+    }
+
+    @Nested
+    class FourConnections {
+
+        private ConnectionFactory connectionFactory1;
+        private ConnectionFactory connectionFactory2;
+        private ConnectionFactory connectionFactory3;
+        private ConnectionFactory connectionFactory4;
+        private Connection connection1;
+        private Connection connection2;
+        private Connection connection3;
+        private Connection connection4;
+        private Channel channel1;
+        private Channel channel2;
+        private Channel channel3;
+        private Channel channel4;
+
+        @BeforeEach
+        void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
+            connectionFactory1 = rabbitMQ.connectionFactory();
+            connectionFactory2 = rabbitMQ.connectionFactory();
+            connectionFactory3 = rabbitMQ.connectionFactory();
+            connectionFactory4 = rabbitMQ.connectionFactory();
+            connection1 = connectionFactory1.newConnection();
+            connection2 = connectionFactory2.newConnection();
+            connection3 = connectionFactory3.newConnection();
+            connection4 = connectionFactory4.newConnection();
+            channel1 = connection1.createChannel();
+            channel2 = connection2.createChannel();
+            channel3 = connection3.createChannel();
+            channel4 = connection4.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(
+                channel1, channel2, channel3, channel4,
+                connection1, connection2, connection3, connection4);
+        }
+
+        @Nested
+        class BroadCast {
+
+            // In the following case, each consumer will receive the messages produced by the
+            // producer
+            // To do so, each consumer will bind it's queue to the producer exchange.
+            @Test
+            void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
+                // Declare a single exchange and three queues attached to it.
+                channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue2 = channel2.queueDeclare().getQueue();
+                channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
+                String queue3 = channel3.queueDeclare().getQueue();
+                channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
+                String queue4 = channel4.queueDeclare().getQueue();
+                channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
+
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel2.basicConsume(queue2, consumer2);
+                channel3.basicConsume(queue3, consumer3);
+                channel4.basicConsume(queue4, consumer4);
+
+                // the publisher will produce 10 messages
+                IntStream.range(0, 10)
+                    .mapToObj(String::valueOf)
+                    .map(RabbitMQTest.this::asBytes)
+                    .forEach(Throwing.consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                awaitAtMostOneMinute.until(
+                    () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
+
+                ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
+                // Check every subscriber have receive all the messages.
+                assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+                assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+                assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+            }
+        }
+
+        @Nested
+        class WorkQueue {
+
+            // In the following case, consumers will receive the messages produced by the
+            // producer but will share them.
+            // To do so, we will bind a single queue to the producer exchange.
+            @Test
+            void rabbitMQShouldSupportTheWorkQueueCase() throws Exception {
+                int nbMessages = 100;
+
+                // Declare the exchange and a single queue attached to it.
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                // Publisher will produce 100 messages
+                IntStream.range(0, nbMessages)
+                    .mapToObj(String::valueOf)
+                    .map(RabbitMQTest.this::asBytes)
+                    .forEach(Throwing.consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel2.basicConsume(WORK_QUEUE, consumer2);
+                channel3.basicConsume(WORK_QUEUE, consumer3);
+                channel4.basicConsume(WORK_QUEUE, consumer4);
+
+                awaitAtMostOneMinute.until(
+                    () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages);
+
+                ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+
+                assertThat(
+                    Iterables.concat(
+                        consumer2.getConsumedMessages(),
+                        consumer3.getConsumedMessages(),
+                        consumer4.getConsumedMessages()))
+                    .containsOnlyElementsOf(expectedResult);
+            }
+
+        }
+
+        @Nested
+        class Routing {
+            @Test
+            void rabbitMQShouldSupportRouting() throws Exception {
+                String conversation1 = "c1";
+                String conversation2 = "c2";
+                String conversation3 = "c3";
+                String conversation4 = "c4";
+
+                // Declare the exchange and a single queue attached to it.
+                channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue1 = channel1.queueDeclare().getQueue();
+                // 1 will follow conversation 1 and 2
+                channel1.queueBind(queue1, EXCHANGE_NAME, conversation1);
+                channel1.queueBind(queue1, EXCHANGE_NAME, conversation2);
+
+                String queue2 = channel2.queueDeclare().getQueue();
+                // 2 will follow conversation 2 and 3
+                channel2.queueBind(queue2, EXCHANGE_NAME, conversation2);
+                channel2.queueBind(queue2, EXCHANGE_NAME, conversation3);
+
+                String queue3 = channel3.queueDeclare().getQueue();
+                // 3 will follow conversation 3 and 4
+                channel3.queueBind(queue3, EXCHANGE_NAME, conversation3);
+                channel3.queueBind(queue3, EXCHANGE_NAME, conversation4);
+
+                String queue4 = channel4.queueDeclare().getQueue();
+                // 4 will follow conversation 1 and 4
+                channel4.queueBind(queue4, EXCHANGE_NAME, conversation1);
+                channel4.queueBind(queue4, EXCHANGE_NAME, conversation4);
+
+                channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1"));
+                channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2"));
+                channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3"));
+                channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4"));
+
+                InMemoryConsumer consumer1 = new InMemoryConsumer(channel1);
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel1.basicConsume(queue1, consumer1);
+                channel2.basicConsume(queue2, consumer2);
+                channel3.basicConsume(queue3, consumer3);
+                channel4.basicConsume(queue4, consumer4);
+
+                awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8);
+
+                assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2);
+                assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3);
+                assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4);
+                assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4);
+            }
+        }
+
+        private long countReceivedMessages(InMemoryConsumer... consumers) {
+            return Arrays.stream(consumers)
+                .map(InMemoryConsumer::getConsumedMessages)
+                .mapToLong(Queue::size)
+                .sum();
+        }
+
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
+    }
+
+    private byte[] asBytes(String message) {
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java
new file mode 100644
index 0000000..2f397ce
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/RabbitMQWaitStrategy.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.mailqueue;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
+
+import com.google.common.primitives.Ints;
+import com.rabbitmq.client.Connection;
+
+public class RabbitMQWaitStrategy implements WaitStrategy {
+
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
+
+    public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) {
+        return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT);
+    }
+
+    private final DockerRabbitMQ rabbitMQ;
+    private final Duration timeout;
+
+    public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) {
+        this.rabbitMQ = rabbitMQ;
+        this.timeout = timeout;
+    }
+
+    @Override
+    public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
+        int seconds = Ints.checkedCast(this.timeout.getSeconds());
+
+        Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected);
+    }
+
+    private Boolean isConnected() throws IOException, TimeoutException {
+        try (Connection connection = rabbitMQ.connectionFactory().newConnection()) {
+            return connection.isOpen();
+        }
+    }
+
+    @Override
+    public WaitStrategy withStartupTimeout(Duration startupTimeout) {
+        return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java
new file mode 100644
index 0000000..12c385b
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/mailqueue/ReusableDockerRabbitMQExtension.java
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.backend.mailqueue;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+public class ReusableDockerRabbitMQExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
+
+    private DockerRabbitMQ rabbitMQ;
+
+    @Override
+    public void beforeAll(ExtensionContext context) {
+        rabbitMQ = DockerRabbitMQ.withoutCookie();
+        rabbitMQ.start();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        rabbitMQ.reset();
+    }
+
+    @Override
+    public void afterAll(ExtensionContext extensionContext) {
+        rabbitMQ.stop();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == DockerRabbitMQ.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return rabbitMQ;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/backends-common/rabbitmq/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/resources/logback-test.xml b/backends-common/rabbitmq/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..d1b28c0
--- /dev/null
+++ b/backends-common/rabbitmq/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+                <immediateFlush>false</immediateFlush>
+                <encoder>
+                        <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
+                </encoder>
+        </appender>
+
+        <logger name="org.apache.james" level="WARN"/>
+
+        <root level="ERROR">
+                <appender-ref ref="CONSOLE" />
+        </root>
+
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e552d7d..79669fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -707,6 +707,12 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>apache-james-backends-rabbitmq</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>apache-james-spamassassin</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 2884684..3a70738 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -38,6 +38,16 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.james</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.james</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-backends-cassandra</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
deleted file mode 100644
index 5e24840..0000000
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.queue.rabbitmq;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.james.util.Runnables;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.testcontainers.containers.Network;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-import com.rabbitmq.client.Address;
-
-public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
-
-    public static final String RABBIT_1 = "rabbit1";
-    public static final String RABBIT_2 = "rabbit2";
-    public static final String RABBIT_3 = "rabbit3";
-    private DockerRabbitMQCluster cluster;
-    private Network network;
-
-    @Override
-    public void beforeEach(ExtensionContext context) {
-        String cookie = Hashing.sha256().hashString("secret cookie here", StandardCharsets.UTF_8).toString();
-
-        network = Network.NetworkImpl.builder()
-            .enableIpv6(false)
-            .createNetworkCmdModifiers(ImmutableList.of())
-            .build();
-
-        DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
-        DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
-        DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
-
-        Runnables.runParallel(
-            rabbitMQ1::start,
-            rabbitMQ2::start,
-            rabbitMQ3::start);
-
-        Runnables.runParallel(
-            Throwing.runnable(() -> rabbitMQ2.join(rabbitMQ1)),
-            Throwing.runnable(() -> rabbitMQ3.join(rabbitMQ1)));
-
-
-
-        Runnables.runParallel(
-            Throwing.runnable(rabbitMQ2::startApp),
-            Throwing.runnable(rabbitMQ3::startApp));
-
-        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
-    }
-
-    @Override
-    public void afterEach(ExtensionContext context) throws Exception {
-        cluster.stop();
-        network.close();
-    }
-
-    @Override
-    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
-    }
-
-    @Override
-    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return cluster;
-    }
-
-    public static class DockerRabbitMQCluster {
-
-        private final DockerRabbitMQ rabbitMQ1;
-        private final DockerRabbitMQ rabbitMQ2;
-        private final DockerRabbitMQ rabbitMQ3;
-
-        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
-            this.rabbitMQ1 = rabbitMQ1;
-            this.rabbitMQ2 = rabbitMQ2;
-            this.rabbitMQ3 = rabbitMQ3;
-        }
-
-        public void stop() {
-            Runnables.runParallel(
-                Throwing.runnable(rabbitMQ1::stop).orDoNothing(),
-                Throwing.runnable(rabbitMQ2::stop).orDoNothing(),
-                Throwing.runnable(rabbitMQ3::stop).orDoNothing());
-        }
-
-        public DockerRabbitMQ getRabbitMQ1() {
-            return rabbitMQ1;
-        }
-
-        public DockerRabbitMQ getRabbitMQ2() {
-            return rabbitMQ2;
-        }
-
-        public DockerRabbitMQ getRabbitMQ3() {
-            return rabbitMQ3;
-        }
-
-        public ImmutableList<Address> getAddresses() {
-            return ImmutableList.of(
-                new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
-                new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
-                new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
deleted file mode 100644
index bba3315..0000000
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.queue.rabbitmq;
-
-import java.util.Optional;
-import java.util.UUID;
-
-import org.apache.james.util.docker.Images;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.DockerClientFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
-
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.ConnectionFactory;
-
-public class DockerRabbitMQ {
-    private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
-
-    private static final String DEFAULT_RABBIT_NODE = "my-rabbit";
-    private static final int DEFAULT_RABBITMQ_PORT = 5672;
-    private static final int DEFAULT_RABBITMQ_ADMIN_PORT = 15672;
-    private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
-    private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
-    private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE";
-    private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME";
-
-    private final GenericContainer<?> container;
-    private final Optional<String> nodeName;
-
-    public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) {
-        return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName),
-            Optional.of(network));
-    }
-
-    public static DockerRabbitMQ withoutCookie() {
-        return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
-    }
-
-    @SuppressWarnings("resource")
-    private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
-        container = new GenericContainer<>(Images.RABBITMQ)
-                .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse(randomName())))
-                .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
-                .withExposedPorts(DEFAULT_RABBITMQ_PORT, DEFAULT_RABBITMQ_ADMIN_PORT)
-                .waitingFor(new WaitAllStrategy()
-                    .withStrategy(Wait.forHttp("").forPort(DEFAULT_RABBITMQ_ADMIN_PORT))
-                    .withStrategy(RabbitMQWaitStrategy.withDefaultTimeout(this)))
-                .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()))
-                .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig()
-                    .withTmpFs(ImmutableMap.of("/var/lib/rabbitmq/mnesia", "rw,noexec,nosuid,size=100m")));
-        net.ifPresent(container::withNetwork);
-        erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie));
-        nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name));
-        this.nodeName = nodeName;
-    }
-
-    private String randomName() {
-        return UUID.randomUUID().toString();
-    }
-
-    public String getHostIp() {
-        return container.getContainerIpAddress();
-    }
-
-    public Integer getPort() {
-        return container.getMappedPort(DEFAULT_RABBITMQ_PORT);
-    }
-
-    public Integer getAdminPort() {
-        return container.getMappedPort(DEFAULT_RABBITMQ_ADMIN_PORT);
-    }
-
-    public String getUsername() {
-        return DEFAULT_RABBITMQ_USERNAME;
-    }
-
-    public String getPassword() {
-        return DEFAULT_RABBITMQ_PASSWORD;
-    }
-
-    public ConnectionFactory connectionFactory() {
-        ConnectionFactory connectionFactory = new ConnectionFactory();
-        connectionFactory.setHost(getHostIp());
-        connectionFactory.setPort(getPort());
-        connectionFactory.setUsername(getUsername());
-        connectionFactory.setPassword(getPassword());
-        return connectionFactory;
-    }
-
-    public void start() {
-        container.start();
-    }
-
-    public void stop() {
-        container.stop();
-    }
-
-    public void restart() {
-        DockerClientFactory.instance().client()
-            .restartContainerCmd(container.getContainerId());
-    }
-
-    public GenericContainer<?> container() {
-        return container;
-    }
-
-    public String node() {
-        return nodeName.get();
-    }
-
-    public void join(DockerRabbitMQ rabbitMQ) throws Exception {
-        stopApp();
-        joinCluster(rabbitMQ);
-    }
-
-    private void stopApp() throws java.io.IOException, InterruptedException {
-        String stdout = container()
-            .execInContainer("rabbitmqctl", "stop_app")
-            .getStdout();
-        LOGGER.debug("stop_app: {}", stdout);
-    }
-
-    private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException {
-        String stdout = container()
-            .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
-            .getStdout();
-        LOGGER.debug("join_cluster: {}", stdout);
-    }
-
-    public void startApp() throws Exception {
-        String stdout = container()
-                .execInContainer("rabbitmqctl", "start_app")
-                .getStdout();
-        LOGGER.debug("start_app: {}", stdout);
-    }
-
-    public void reset() throws Exception {
-        stopApp();
-
-        String stdout = container()
-            .execInContainer("rabbitmqctl", "reset")
-            .getStdout();
-        LOGGER.debug("reset: {}", stdout);
-
-        startApp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/cc72f881/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
deleted file mode 100644
index b0cbfd7..0000000
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.queue.rabbitmq;
-
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-
-public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
-
-    private DockerRabbitMQ rabbitMQ;
-
-    @Override
-    public void beforeEach(ExtensionContext context) {
-        rabbitMQ = DockerRabbitMQ.withoutCookie();
-        rabbitMQ.start();
-    }
-
-    @Override
-    public void afterEach(ExtensionContext context) {
-        rabbitMQ.stop();
-    }
-
-    @Override
-    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return (parameterContext.getParameter().getType() == DockerRabbitMQ.class);
-    }
-
-    @Override
-    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return rabbitMQ;
-    }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message