activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour.
Date Mon, 05 Jun 2017 22:19:38 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7036e5038 -> 56485a204


ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour.

Add test case, to prove the issue, and then obviously ensure it works, post fix.
Apply changes in logic of createQueueName to handle global better and fix the behaviour.
Create queues so names are same as behaviour with core client.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d9d96997
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d9d96997
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d9d96997

Branch: refs/heads/master
Commit: d9d9699732e4c9de0d9f8f009a4b1fef26b37589
Parents: 7036e50
Author: Michael Andre Pearce <Michael.Andre.Pearce@me.com>
Authored: Mon Jun 5 13:45:25 2017 +0100
Committer: Michael Andre Pearce <Michael.Andre.Pearce@me.com>
Committed: Mon Jun 5 22:40:54 2017 +0100

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  |   7 +-
 .../amqp/ClientDefinedMultiConsumerTest.java    |  32 ++--
 .../integration/amqp/JMSSharedConsumerTest.java | 166 +++++++++++++++++++
 .../amqp/JMSSharedDurableConsumerTest.java      | 166 +++++++++++++++++++
 4 files changed, 350 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d9d96997/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 6dcf41a..ad8bee7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -738,16 +738,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
                                          boolean shared,
                                          boolean global,
                                          boolean isVolatile) {
-      String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
+      String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId
+ "." + pubId;
       if (shared) {
          if (queue.contains("|")) {
             queue = queue.split("\\|")[0];
          }
          if (isVolatile) {
-            queue += ":shared-volatile";
-         }
-         if (global) {
-            queue += ":global";
+            queue = "nonDurable" + "." + queue;
          }
       }
       return queue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d9d96997/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
index 51c70ee..3f8da1a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
       receiver2.close();
       //check its been deleted
       Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisfied() throws Exception {
-            return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))
== null;
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))
== null;
          }
       }, 1000);
       connection.close();
@@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       AddressInfo addressInfo = new AddressInfo(address);
       addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
       server.addAddressInfo(addressInfo);
-      server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"),
null, true, false, -1, false, false);
+      server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"),
null, true, false, -1, false, false);
       AmqpClient client = createAmqpClient();
 
       AmqpConnection connection = addConnection(client.connect("myClientId"));
@@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
       receiver2.close();
       //check its **Hasn't** been deleted
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
       connection.close();
    }
 
@@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
       //check its been deleted
       connection.close();
       Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisfied() throws Exception {
-            return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))
== null;
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))
== null;
          }
       }, 1000);
    }
@@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")));
       receiver2.close();
       //check its been deleted
       Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisfied() throws Exception {
-            return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))
== null;
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub"))
== null;
          }
       }, 1000);
       connection.close();
@@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(amqpMessage);
       amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
       assertNotNull(amqpMessage);
-      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
+      assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount());
       receiver.close();
-      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
+      assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
       receiver2.close();
       //check its been deleted
-      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
+      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
       connection.close();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d9d96997/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
new file mode 100644
index 0000000..c49fcff
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.junit.Test;
+
+public class JMSSharedConsumerTest extends JMSClientTestSupport {
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   private void testSharedConsumer(Connection connection1, Connection connection2) throws
JMSException {
+      try {
+         Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Topic topic = session1.createTopic(getTopicName());
+         Topic topic2 = session2.createTopic(getTopicName());
+
+         final MessageConsumer consumer1 = session1.createSharedConsumer(topic, "SharedConsumer");
+         final MessageConsumer consumer2 = session2.createSharedConsumer(topic2, "SharedConsumer");
+
+         MessageProducer producer = session1.createProducer(topic);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         connection1.start();
+
+         TextMessage message = session1.createTextMessage();
+         message.setText("hello");
+         producer.send(message);
+
+         Message message1 = consumer1.receive(100);
+         Message message2 = consumer2.receive(100);
+
+         Message received = null;
+         if (message1 != null) {
+            assertNull("Message should only be delivered once per subscribtion but see twice",
message2);
+            received = message1;
+         } else {
+            received = message2;
+         }
+         assertNotNull("Should have received a message by now.", received);
+         assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
+      } finally {
+         connection1.close();
+         connection2.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedConsumer() throws Exception {
+      Connection connection = createConnection(); //AMQP
+      Connection connection2 = createConnection(); //AMQP
+
+      testSharedConsumer(connection, connection2);
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedConsumerWithArtemisClient() throws Exception {
+
+      Connection connection = createCoreConnection(); //CORE
+      Connection connection2 = createCoreConnection(); //CORE
+
+      testSharedConsumer(connection, connection2);
+
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception {
+
+      Connection connection = createConnection(); //AMQP
+      Connection connection2 = createCoreConnection(); //CORE
+
+      testSharedConsumer(connection, connection2);
+
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception {
+
+      Connection connection = createCoreConnection(); //CORE
+      Connection connection2 = createConnection(); //AMQP
+
+      testSharedConsumer(connection, connection2);
+
+   }
+
+
+   protected String getBrokerCoreJMSConnectionString() {
+
+      try {
+         int port = AMQP_PORT;
+
+         String uri = null;
+
+         if (isUseSSL()) {
+            uri = "tcp://127.0.0.1:" + port;
+         } else {
+            uri = "tcp://127.0.0.1:" + port;
+         }
+
+         if (!getJmsConnectionURIOptions().isEmpty()) {
+            uri = uri + "?" + getJmsConnectionURIOptions();
+         }
+
+         return uri;
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   protected Connection createCoreConnection() throws JMSException {
+      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
+   }
+
+   private Connection createCoreConnection(String connectionString, String username, String
password, String clientId, boolean start) throws JMSException {
+      ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
+
+      Connection connection = trackJMSConnection(factory.createConnection(username, password));
+
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+      if (clientId != null && !clientId.isEmpty()) {
+         connection.setClientID(clientId);
+      }
+
+      if (start) {
+         connection.start();
+      }
+
+      return connection;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d9d96997/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
new file mode 100644
index 0000000..040506b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.junit.Test;
+
+public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   private void testSharedDurableConsumer(Connection connection1, Connection connection2)
throws JMSException {
+      try {
+         Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Topic topic = session1.createTopic(getTopicName());
+         Topic topic2 = session2.createTopic(getTopicName());
+
+         final MessageConsumer consumer1 = session1.createSharedDurableConsumer(topic, "SharedConsumer");
+         final MessageConsumer consumer2 = session2.createSharedDurableConsumer(topic2, "SharedConsumer");
+
+         MessageProducer producer = session1.createProducer(topic);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         connection1.start();
+
+         TextMessage message = session1.createTextMessage();
+         message.setText("hello");
+         producer.send(message);
+
+         Message message1 = consumer1.receive(100);
+         Message message2 = consumer2.receive(100);
+
+         Message received = null;
+         if (message1 != null) {
+            assertNull("Message should only be delivered once per subscribtion but see twice",
message2);
+            received = message1;
+         } else {
+            received = message2;
+         }
+         assertNotNull("Should have received a message by now.", received);
+         assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
+      } finally {
+         connection1.close();
+         connection2.close();
+      }
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedDurableConsumer() throws Exception {
+      Connection connection = createConnection(); //AMQP
+      Connection connection2 = createConnection(); //AMQP
+
+      testSharedDurableConsumer(connection, connection2);
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedDurableConsumerWithArtemisClient() throws Exception {
+
+      Connection connection = createCoreConnection(); //CORE
+      Connection connection2 = createCoreConnection(); //CORE
+
+      testSharedDurableConsumer(connection, connection2);
+
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception
{
+
+      Connection connection = createConnection(); //AMQP
+      Connection connection2 = createCoreConnection(); //CORE
+
+      testSharedDurableConsumer(connection, connection2);
+
+   }
+
+   @Test(timeout = 30000)
+   public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception
{
+
+      Connection connection = createCoreConnection(); //CORE
+      Connection connection2 = createConnection(); //AMQP
+
+      testSharedDurableConsumer(connection, connection2);
+
+   }
+
+
+   protected String getBrokerCoreJMSConnectionString() {
+
+      try {
+         int port = AMQP_PORT;
+
+         String uri = null;
+
+         if (isUseSSL()) {
+            uri = "tcp://127.0.0.1:" + port;
+         } else {
+            uri = "tcp://127.0.0.1:" + port;
+         }
+
+         if (!getJmsConnectionURIOptions().isEmpty()) {
+            uri = uri + "?" + getJmsConnectionURIOptions();
+         }
+
+         return uri;
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   protected Connection createCoreConnection() throws JMSException {
+      return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
+   }
+
+   private Connection createCoreConnection(String connectionString, String username, String
password, String clientId, boolean start) throws JMSException {
+      ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
+
+      Connection connection = trackJMSConnection(factory.createConnection(username, password));
+
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+      if (clientId != null && !clientId.isEmpty()) {
+         connection.setClientID(clientId);
+      }
+
+      if (start) {
+         connection.start();
+      }
+
+      return connection;
+   }
+
+}


Mime
View raw message