activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1900 fix race in STOMP auto-create
Date Thu, 07 Jun 2018 15:25:33 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 9fd43a6b7 -> 7b49a8e3d


ARTEMIS-1900 fix race in STOMP auto-create

(cherry picked from commit dc29a55e1b0106ddaf8e332d098cde4520bd639e)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b49a8e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b49a8e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b49a8e3

Branch: refs/heads/2.6.x
Commit: 7b49a8e3d2f38fc97311b46d7ebd395e5b88b879
Parents: 9fd43a6
Author: Justin Bertram <jbertram@apache.org>
Authored: Mon Jun 4 14:27:31 2018 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Jun 7 11:25:24 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    |  7 +-
 .../protocol/stomp/StompProtocolManager.java    |  4 +-
 .../core/protocol/stomp/StompSession.java       | 10 ++-
 .../stomp/StompTestMultiThreaded.java           | 95 ++++++++++++++++++++
 4 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b49a8e3/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 171d7be..9417409 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -276,7 +276,6 @@ public final class StompConnection implements RemotingConnection {
          AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
          AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
          RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType()
: routingType;
-         boolean checkAnycast = false;
          /**
           * If the address doesn't exist then it is created if possible.
           * If the address does exist but doesn't support the routing-type then the address
is updated if possible.
@@ -285,8 +284,6 @@ public final class StompConnection implements RemotingConnection {
             if (addressSettings.isAutoCreateAddresses()) {
                session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
             }
-
-            checkAnycast = true;
          } else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType))
{
             if (addressSettings.isAutoCreateAddresses()) {
                EnumSet<RoutingType> routingTypes = EnumSet.noneOf(RoutingType.class);
@@ -296,12 +293,10 @@ public final class StompConnection implements RemotingConnection {
                routingTypes.add(effectiveAddressRoutingType);
                manager.getServer().updateAddressInfo(simpleQueue, routingTypes);
             }
-
-            checkAnycast = true;
          }
 
          // only auto create the queue if the address is ANYCAST
-         if (checkAnycast && effectiveAddressRoutingType == RoutingType.ANYCAST &&
addressSettings.isAutoCreateQueues()) {
+         if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()
&& manager.getServer().locateQueue(simpleQueue) == null) {
             session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType()
: routingType, null, false, true, true);
          }
       } catch (ActiveMQQueueExistsException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b49a8e3/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 888674c..c5782c1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -190,8 +190,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,
St
    // Public --------------------------------------------------------
 
    public boolean send(final StompConnection connection, final StompFrame frame) {
-      if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQServerLogger.LOGGER.trace("sent " + frame);
+      if (ActiveMQStompProtocolLogger.LOGGER.isTraceEnabled()) {
+         ActiveMQStompProtocolLogger.LOGGER.trace("sent " + frame);
       }
 
       invokeInterceptors(this.outgoingInterceptors, frame, connection);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b49a8e3/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 8a573e6..e370c81 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -252,11 +252,13 @@ public class StompSession implements SessionCallback {
                                String destination,
                                String selector,
                                String ack) throws Exception {
+      SimpleString address = SimpleString.toSimpleString(destination);
       SimpleString queueName = SimpleString.toSimpleString(destination);
+      SimpleString selectorSimple = SimpleString.toSimpleString(selector);
       boolean pubSub = false;
       final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ?
-1 : consumerCredits;
 
-      Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
+      Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
       boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
       if (topic) {
          // subscribes to a topic
@@ -267,14 +269,14 @@ public class StompSession implements SessionCallback {
             }
             queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
             if (manager.getServer().locateQueue(queueName) == null) {
-               session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector),
false, true);
+               session.createQueue(address, queueName, selectorSimple, false, true);
             }
          } else {
             queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
-            session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector),
true, false);
+            session.createQueue(address, queueName, selectorSimple, true, false);
          }
       }
-      final ServerConsumer consumer = topic ? session.createConsumer(consumerID, queueName,
null, false, false, 0) : session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector),
false, false, 0);
+      final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic
? null : selectorSimple, false, false, 0);
       StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName,
pubSub);
       subscriptions.put(consumerID, subscription);
       session.start();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b49a8e3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestMultiThreaded.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestMultiThreaded.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestMultiThreaded.java
new file mode 100644
index 0000000..63dd35b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestMultiThreaded.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.net.URI;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class StompTestMultiThreaded extends StompTestBase {
+
+   private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+   private static final SimpleString QUEUE = new SimpleString("x");
+
+   class SomeConsumer extends Thread {
+
+      private final StompClientConnection conn;
+
+      boolean failed = false;
+
+      SomeConsumer() throws Exception {
+         URI uri = createStompClientUri(scheme, "localhost", 61614);
+         this.conn = StompClientConnectionFactory.createClientConnection(uri);
+      }
+
+      @Override
+      public void run() {
+         try {
+            conn.connect(defUser, defPass);
+            if (!subscribe(conn, UUID.randomUUID().toString(), Stomp.Headers.Subscribe.AckModeValues.AUTO,
null, null, "/queue/" + QUEUE, true).getCommand().equals(Stomp.Responses.RECEIPT)) {
+               failed = true;
+            }
+         } catch (Throwable e) {
+            failed = true;
+         } finally {
+            try {
+               conn.disconnect();
+            } catch (Exception e) {
+            }
+         }
+      }
+   }
+
+   @Test
+   public void testTwoConcurrentSubscribers() throws Exception {
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteAddresses(false).setAutoDeleteQueues(false));
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://localhost:61614?protocols=STOMP&anycastPrefix=/queue/").start();
+
+      int nThreads = 2;
+
+      SomeConsumer[] consumers = new SomeConsumer[nThreads];
+      for (int j = 0; j < 1000; j++) {
+
+         for (int i = 0; i < nThreads; i++) {
+            consumers[i] = new SomeConsumer();
+         }
+
+         for (int i = 0; i < nThreads; i++) {
+            consumers[i].start();
+         }
+
+         for (SomeConsumer consumer : consumers) {
+            consumer.join();
+            Assert.assertFalse(consumer.failed);
+         }
+
+         // delete queue here so it can be auto-created again during the next loop iteration
+         server.getActiveMQServer().locateQueue(QUEUE).deleteQueue();
+      }
+   }
+}


Mime
View raw message