activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: NO-JIRA speeding up a few tests
Date Tue, 07 May 2019 03:17:38 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e40984  NO-JIRA speeding up a few tests
     new 4c6447ea This closes #2660
0e40984 is described below

commit 0e409841458936e2a190f00539dc362935a5b504
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Mon May 6 11:49:06 2019 -0400

    NO-JIRA speeding up a few tests
---
 .../core/protocol/mqtt/MQTTProtocolManager.java    |   5 +
 .../remoting/server/impl/RemotingServiceImpl.java  |   6 ++
 .../integration/amqp/JMSMessageConsumerTest.java   |   7 +-
 .../client/ConcurrentCreateDeleteProduceTest.java  |   9 +-
 .../tests/integration/client/ConsumerTest.java     |   2 +-
 .../tests/integration/client/LargeMessageTest.java |  43 ++++-----
 .../jms/cluster/JMSFailoverListenerTest.java       |   5 +-
 .../largemessage/LargeMessageTestBase.java         |   4 -
 .../imported/MqttClusterRemoteSubscribeTest.java   | 105 ++++++++++++++-------
 9 files changed, 114 insertions(+), 72 deletions(-)

diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 6e91443..d1777ea 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -253,4 +253,9 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,
MQ
    public Map<String, MQTTSessionState> getSessionStates() {
       return new HashMap<>(sessionStates);
    }
+
+   /** For DEBUG only */
+   public Map<String, MQTTConnection> getConnectedClients() {
+      return connectedClients;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 87a3c30..416e9a9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -286,6 +286,12 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       return acceptor;
    }
 
+
+   /** No interface method, for tests only */
+   public Map<String, Acceptor> getAcceptors() {
+      return acceptors;
+   }
+
    @Override
    public void destroyAcceptor(String name) throws Exception {
       Acceptor acceptor = acceptors.get(name);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index 485d886..7a83172 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -800,7 +800,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
       }
    }
 
-   @Test(timeout = 240000)
+   @Test(timeout = 30000)
    public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
       String name = "exampleQueue1";
 
@@ -823,8 +823,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             queue = session.createQueue(name);
             MessageConsumer c = session.createConsumer(queue);
-            c.receive(1000);
-            producer.close();
+            Assert.assertNotNull(c.receive(1000));
             session.close();
          }
 
@@ -832,7 +831,7 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
          queue = session.createQueue(name);
          MessageConsumer c = session.createConsumer(queue);
          for (int i = 0; i < numMessages; i++) {
-            c.receive(1000);
+            Assert.assertNull(c.receive(1));
          }
          producer.close();
          session.close();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java
index afc34d9..12a7af4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConcurrentCreateDeleteProduceTest.java
@@ -69,6 +69,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase
{
 
    @Test
    public void testConcurrentProduceCreateAndDelete() throws Throwable {
+      locator.setBlockOnDurableSend(false).setBlockOnNonDurableSend(false);
       ClientSessionFactory factory = locator.createSessionFactory();
       ClientSession session = factory.createSession(true, true);
       ClientProducer producer = session.createProducer(ADDRESS);
@@ -84,7 +85,7 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase
{
          consumers[i].start();
       }
 
-      for (int i = 0; i < 50000 && running; i++) {
+      for (int i = 0; i < 1500 && running; i++) {
          producer.send(session.createMessage(true));
          //Thread.sleep(10);
       }
@@ -122,9 +123,10 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase
{
                session.createQueue(ADDRESS, queueName, true);
                ClientConsumer consumer = session.createConsumer(queueName);
                while (running) {
-                  ClientMessage msg = consumer.receive(5000);
+                  ClientMessage msg = consumer.receive(500);
                   if (msg == null) {
-                     break;
+                     if (running) continue;
+                     else break;
                   }
                   if (msgcount++ == 500) {
                      msgcount = 0;
@@ -134,7 +136,6 @@ public class ConcurrentCreateDeleteProduceTest extends ActiveMQTestBase
{
                consumer.close();
                session.commit();
                session.deleteQueue(queueName);
-               System.out.println("Deleting " + queueName);
             }
             session.close();
          } catch (Throwable e) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 39200e6..bbde2b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -509,7 +509,7 @@ public class ConsumerTest extends ActiveMQTestBase {
          }
 
          long time = System.currentTimeMillis();
-         int NUMBER_OF_MESSAGES = durable ? 500 : 5000;
+         int NUMBER_OF_MESSAGES = durable ? 5 : 50;
          for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
             TextMessage msg = session.createTextMessage("hello " + i);
             msg.setIntProperty("mycount", i);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 84727f6..add05f0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -107,6 +107,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
             locator.setConsumerWindowSize(0);
          }
       }
+      locator.setBlockOnNonDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
       settings.setMaxDeliveryAttempts(-1);
 
       server.getAddressSettingsRepository().addMatch("#", settings);
@@ -148,7 +149,6 @@ public class LargeMessageTest extends LargeMessageTestBase {
             // System.out.println("message:" + message);
             try {
                if (counter++ < 20) {
-                  Thread.sleep(100);
                   // System.out.println("Rollback");
                   message.acknowledge();
                   session.rollback();
@@ -1189,22 +1189,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
    @Test
    public void testFilePersistenceDelayed() throws Exception {
-      testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
+      testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 200);
    }
 
    @Test
    public void testFilePersistenceDelayedConsumer() throws Exception {
-      testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
+      testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 200);
    }
 
    @Test
    public void testFilePersistenceDelayedXA() throws Exception {
-      testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
+      testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 200);
    }
 
    @Test
    public void testFilePersistenceDelayedXAConsumer() throws Exception {
-      testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
+      testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize,
LargeMessageTest.RECEIVE_WAIT_TIME, 200);
    }
 
    @Test
@@ -1274,22 +1274,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
    @Test
    public void testSendRegularMessageNullPersistenceDelayed() throws Exception {
-      testChunks(false, false, true, false, false, false, false, false, false, 100, 100,
LargeMessageTest.RECEIVE_WAIT_TIME, 1000);
+      testChunks(false, false, true, false, false, false, false, false, false, 100, 100,
LargeMessageTest.RECEIVE_WAIT_TIME, 100);
    }
 
    @Test
    public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception {
-      testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(false, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
    public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception {
-      testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(true, false, true, false, false, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
    public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception
{
-      testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(true, false, true, false, false, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
@@ -1314,22 +1314,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
    @Test
    public void testSendRegularMessagePersistenceDelayed() throws Exception {
-      testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
    public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception {
-      testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
    public void testSendRegularMessagePersistenceDelayedXA() throws Exception {
-      testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(false, false, true, false, true, false, false, false, false, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
    public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception {
-      testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
1000);
+      testChunks(false, false, true, false, true, false, false, false, true, 100, 100, LargeMessageTest.RECEIVE_WAIT_TIME,
100);
    }
 
    @Test
@@ -1659,11 +1659,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
          ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS);
 
-         // Wait the consumer to be complete with 10 messages before getting others
-         long timeout = System.currentTimeMillis() + 10000;
-         while (consumer.getBufferSize() < NUMBER_OF_MESSAGES && timeout >
System.currentTimeMillis()) {
-            Thread.sleep(10);
-         }
+         Wait.waitFor(() -> consumer.getBufferSize() >= NUMBER_OF_MESSAGES, 30_000,
100);
          Assert.assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize());
 
          // Reads the messages, rollback.. read them again
@@ -1711,14 +1707,14 @@ public class LargeMessageTest extends LargeMessageTestBase {
       ActiveMQServer server = null;
 
       final int SIZE = 10 * 1024;
-      final int NUMBER_OF_MESSAGES = 1000;
+      final int NUMBER_OF_MESSAGES = 100;
       try {
 
          server = createServer(true, isNetty(), storeType);
 
          server.start();
 
-         locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024);
+         locator.setMinLargeMessageSize(1024).setConsumerWindowSize(1024 * 1024).setBlockOnDurableSend(false).setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
 
          ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
 
@@ -1744,11 +1740,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
             ClientConsumerInternal consumer = (ClientConsumerInternal) session.createConsumer(ADDRESS);
 
-            // Wait the consumer to be complete with 10 messages before getting others
-            long timeout = System.currentTimeMillis() + 10000;
-            while (consumer.getBufferSize() < 10 && timeout > System.currentTimeMillis())
{
-               Thread.sleep(10);
-            }
+            Assert.assertTrue(Wait.waitFor(() -> consumer.getBufferSize() >= 5, 30_000,
100));
+
 
             for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                ClientMessage msg = consumer.receive(10000);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
index cec8e1b..7bd9cbe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.InVMNodeManagerServer;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
@@ -142,12 +143,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
          producer.send(bm);
       }
 
+      Wait.assertEquals(numMessages, liveServer.locateQueue(jmsQueueName)::getMessageCount);
+
       conn.start();
 
       JMSFailoverListenerTest.log.info("sent messages and started connection");
 
-      Thread.sleep(2000);
-
       JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession());
 
       Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
index 302c64a..1b1a930 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java
@@ -346,14 +346,10 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase
{
                session.start();
 
                for (int i = 0; i < numberOfMessages; i++) {
-                  System.currentTimeMillis();
-
                   ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
 
                   Assert.assertNotNull(message);
 
-                  System.currentTimeMillis();
-
                   if (delayDelivery > 0) {
                      long originalTime = (Long) message.getObjectProperty(new SimpleString("original-time"));
                      Assert.assertTrue(System.currentTimeMillis() - originalTime + "<"
+ delayDelivery, System.currentTimeMillis() - originalTime >= delayDelivery);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
index 19360b1..f839109 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
@@ -21,8 +21,14 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
+import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
+import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
@@ -57,16 +63,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection subConnection2 = null;
       BlockingConnection pubConnection = null;
       try {
-         //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
          subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
+
+         Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
+
          subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
          pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
 
          //Waiting for the first sub connection be closed
          assertTrue(waitConnectionClosed(subConnection1));
-
+         Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
+         subConnection1 = null;
          subConnection2.subscribe(topics);
 
          waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
@@ -107,11 +116,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
          pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
          pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
 
-         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message11);
-         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message21);
-         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message31);
 
       } finally {
@@ -145,7 +154,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection connection2 = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
          connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
          connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@@ -244,14 +253,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
       BlockingConnection pubConnection = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
          subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
+
+         Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
+
          subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
          pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
 
          //Waiting for the first sub connection be closed
          assertTrue(waitConnectionClosed(subConnection1));
+         Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
+         subConnection1 = null;
+
 
          subConnection2.subscribe(topics);
 
@@ -293,11 +308,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
          pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
          pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
 
-         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message11);
-         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message21);
-         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message31);
 
       } finally {
@@ -332,7 +347,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection connection2 = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
          connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
          connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@@ -436,14 +451,19 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
       BlockingConnection pubConnection = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
          subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
+
+         Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
+
          subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
          pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
 
          //Waiting for the first sub connection be closed
          assertTrue(waitConnectionClosed(subConnection1));
+         Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
+         subConnection1 = null;
 
          subConnection2.subscribe(topics);
 
@@ -485,11 +505,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
          pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
          pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
 
-         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message11);
-         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message21);
-         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message31);
 
       } finally {
@@ -523,7 +543,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection connection2 = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
          connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
          connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
@@ -608,6 +628,20 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
 
    }
 
+   MQTTProtocolManager locateMQTTPM(ActiveMQServer server) {
+
+      RemotingServiceImpl impl = (RemotingServiceImpl) server.getRemotingService();
+      for (Acceptor acceptor : impl.getAcceptors().values()) {
+         AbstractAcceptor abstractAcceptor = (AbstractAcceptor) acceptor;
+         for (ProtocolManager manager : abstractAcceptor.getProtocolMap().values()) {
+            if (manager instanceof MQTTProtocolManager) {
+               return (MQTTProtocolManager) manager;
+            }
+         }
+      }
+      return null;
+   }
+
    @Test
    public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception
{
       final String MULTICAST_TOPIC = "multicast/test/+/some/#";
@@ -624,9 +658,10 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection pubConnection = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
          subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
+         Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
          subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
          pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
 
@@ -673,11 +708,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
          pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE,
false);
          pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE,
false);
 
-         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message11);
-         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message21);
-         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message31);
 
       } finally {
@@ -712,10 +747,12 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
       BlockingConnection connection2 = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
          connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
+         Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
          connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
+         Wait.assertEquals(1, locateMQTTPM(servers[1]).getConnectedClients()::size);
          // Subscribe to topics
          connection1.subscribe(topics);
 
@@ -817,7 +854,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection connection2 = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
          connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
          // Subscribe to topics
@@ -923,8 +960,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       BlockingConnection pubConnection = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
+
+         Wait.assertEquals(1, locateMQTTPM(servers[0]).getConnectedClients()::size);
+
          subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
          pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
 
@@ -985,11 +1025,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
          message11.ack();
          assertEquals(payload4, new String(message11.getPayload()));
 
-         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message21);
-         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message31);
-         Message message41 = subConnection2.receive(5, TimeUnit.SECONDS);
+         Message message41 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message41);
 
       } finally {
@@ -1024,10 +1064,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
       BlockingConnection pubConnection = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId);
 
          subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId);
+         Wait.assertEquals(2, locateMQTTPM(servers[0]).getConnectedClients()::size);
          subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId);
 
          //Waiting for the first sub connection be closed
@@ -1086,11 +1127,11 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
          pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);
          pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false);
 
-         Message message11 = subConnection3.receive(5, TimeUnit.SECONDS);
+         Message message11 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message11);
-         Message message21 = subConnection3.receive(5, TimeUnit.SECONDS);
+         Message message21 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message21);
-         Message message31 = subConnection3.receive(5, TimeUnit.SECONDS);
+         Message message31 = subConnection3.receive(100, TimeUnit.MILLISECONDS);
          assertNull(message31);
 
 
@@ -1131,7 +1172,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase
{
       BlockingConnection connection3 = null;
       try {
          //Waiting for resource initialization to complete
-         Thread.sleep(5000);
+         Thread.sleep(1000);
          connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
          connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);
          connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3);


Mime
View raw message