activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1930 require STOMP durable sub name to unsubscribe
Date Fri, 22 Jun 2018 19:47:21 GMT
ARTEMIS-1930 require STOMP durable sub name to unsubscribe


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ed7a616
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ed7a616
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ed7a616

Branch: refs/heads/master
Commit: 1ed7a616ee5e9beedb173c56a9226e314339a8e4
Parents: 4f0bb98
Author: Justin Bertram <jbertram@apache.org>
Authored: Mon Jun 18 15:19:48 2018 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Jun 22 15:47:14 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompSession.java       | 16 ++++-----
 .../core/protocol/stomp/StompSubscription.java  | 14 ++++----
 .../tests/integration/stomp/StompTest.java      | 34 ++++++++++++++++++++
 3 files changed, 49 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ed7a616/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index e370c81..291634f 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
@@ -255,14 +256,12 @@ public class StompSession implements SessionCallback {
       SimpleString address = SimpleString.toSimpleString(destination);
       SimpleString queueName = SimpleString.toSimpleString(destination);
       SimpleString selectorSimple = SimpleString.toSimpleString(selector);
-      boolean pubSub = false;
       final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ?
-1 : consumerCredits;
 
       Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
-      boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
-      if (topic) {
+      boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
+      if (multicast) {
          // subscribes to a topic
-         pubSub = true;
          if (durableSubscriptionName != null) {
             if (clientID == null) {
                throw BUNDLE.missingClientID();
@@ -276,8 +275,8 @@ public class StompSession implements SessionCallback {
             session.createQueue(address, queueName, selectorSimple, true, false);
          }
       }
-      final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic
? null : selectorSimple, false, false, 0);
-      StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName,
pubSub);
+      final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast
? null : selectorSimple, false, false, 0);
+      StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName,
multicast);
       subscriptions.put(consumerID, subscription);
       session.start();
       return () -> consumer.receiveCredits(receiveCredits);
@@ -295,14 +294,15 @@ public class StompSession implements SessionCallback {
             iterator.remove();
             SimpleString queueName = sub.getQueueName();
             session.closeConsumer(consumerID);
-            if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null)
{
+            Queue queue = manager.getServer().locateQueue(queueName);
+            if (sub.isMulticast() && queue != null && (durableSubscriptionName
== null && !queue.isDurable())) {
                session.deleteQueue(queueName);
             }
             result = true;
          }
       }
 
-      if (!result && durableSubscriptionName != null && clientID != null)
{
+      if (durableSubscriptionName != null && clientID != null) {
          SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
          if (manager.getServer().locateQueue(queueName) != null) {
             session.deleteQueue(queueName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ed7a616/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index a1417ad..de6044b 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -29,18 +29,18 @@ public class StompSubscription {
 
    private final SimpleString queueName;
 
-   // whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS
topic)
-   private final boolean pubSub;
+   // whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
+   private final boolean multicast;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub)
{
+   public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast)
{
       this.subID = subID;
       this.ack = ack;
       this.queueName = queueName;
-      this.pubSub = pubSub;
+      this.multicast = multicast;
    }
 
    // Public --------------------------------------------------------
@@ -57,13 +57,13 @@ public class StompSubscription {
       return queueName;
    }
 
-   public boolean isPubSub() {
-      return pubSub;
+   public boolean isMulticast() {
+      return multicast;
    }
 
    @Override
    public String toString() {
-      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName
+ ", pubSub=" + pubSub + "]";
+      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName
+ ", multicast=" + multicast + "]";
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ed7a616/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index bc363f2..5c6eefe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1344,6 +1344,40 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testDurableUnSubscribeWithoutDurableSubName() throws Exception {
+      server.getActiveMQServer().getConfiguration().getWildcardConfiguration().setDelimiter('/');
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("/topic/#", new
AddressSettings().setDefaultAddressRoutingType(RoutingType.MULTICAST).setDefaultQueueRoutingType(RoutingType.MULTICAST));
+      conn.connect(defUser, defPass, "myclientid");
+      String subId = UUID.randomUUID().toString();
+      String durableSubName = UUID.randomUUID().toString();
+      String receipt = UUID.randomUUID().toString();
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                                   .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo")
+                                   .addHeader(Stomp.Headers.Unsubscribe.ID, subId)
+                                   .addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
+                                   .addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME,
durableSubName)
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
+
+      frame = conn.sendFrame(frame);
+      assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+      assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid."
+ durableSubName)) != null, 2000, 100));
+
+      receipt = UUID.randomUUID().toString();
+      frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE)
+                  .addHeader(Stomp.Headers.Unsubscribe.ID, subId)
+                  .addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
+
+      frame = conn.sendFrame(frame);
+      assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+      conn.disconnect();
+
+      // make sure the durable subscription queue is still there
+      assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid."
+ durableSubName)) != null, 2000, 100));
+   }
+
+   @Test
    public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception {
       conn.connect(defUser, defPass, "myclientid");
       subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);


Mime
View raw message