activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gaohow...@apache.org
Subject [1/2] activemq-artemis git commit: [ARTEMIS-1030] add support for auto mapping openwire virtual topic consumer destinations to FQQN
Date Mon, 22 Jan 2018 12:33:30 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 6428a897c -> 701ab1feb


[ARTEMIS-1030] add support for auto mapping openwire virtual topic consumer destinations to
FQQN


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/82e4f465
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/82e4f465
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/82e4f465

Branch: refs/heads/master
Commit: 82e4f465eeba0ba89f123efa7c10a7fd2202c8c1
Parents: 6428a89
Author: gtully <gary.tully@gmail.com>
Authored: Mon Jan 22 11:10:46 2018 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Jan 22 11:10:46 2018 +0000

----------------------------------------------------------------------
 .../openwire/OpenWireProtocolManager.java       |  75 ++++++++++++++
 .../core/protocol/openwire/amq/AMQSession.java  |   1 +
 docs/migration-guide/en/VirtualTopics.md        |  24 ++++-
 .../en/protocols-interoperability.md            |  19 ++++
 .../openwire/OpenWireProtocolManagerTest.java   | 101 +++++++++++++++++++
 .../VirtualTopicToFQQNOpenWireTest.java         |  87 ++++++++++++++++
 6 files changed, 305 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82e4f465/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 87925b5..301b5fc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -49,14 +49,18 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.selector.impl.LRUCache;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -70,6 +74,8 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.filter.DestinationPath;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.state.ProducerState;
@@ -128,6 +134,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
 
+   private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new
HashMap<>();
+   protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache
= new LRUCache();
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer
server) {
       this.factory = factory;
       this.server = server;
@@ -607,4 +616,70 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
    public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects)
{
       this.suppressInternalManagementObjects = suppressInternalManagementObjects;
    }
+
+   public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
+      for (String filter : virtualTopicConsumerWildcards.split(",")) {
+         String[] wildcardLimitPair = filter.split(";");
+         vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])),
Integer.valueOf(wildcardLimitPair[1]));
+      }
+   }
+
+   public void setVirtualTopicConsumerLruCacheMax(int max) {
+      vtDestMapCache.setMaxCacheSize(max);
+   }
+
+   public ActiveMQDestination virtualTopicConsumerToFQQN(final ActiveMQDestination destination)
{
+
+      if (vtConsumerDestinationMatchers.isEmpty()) {
+         return destination;
+      }
+
+      ActiveMQDestination mappedDestination = null;
+      synchronized (vtDestMapCache) {
+         mappedDestination = vtDestMapCache.get(destination);
+      }
+
+      if (mappedDestination != null) {
+         return mappedDestination;
+      }
+
+      for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet())
{
+         if (candidate.getKey().matches(destination)) {
+            // convert to matching FQQN
+            String[] paths = DestinationPath.getDestinationPaths(destination);
+            StringBuilder fqqn = new StringBuilder();
+            int filterPathTerminus = candidate.getValue();
+            // address - ie: topic
+            for (int i = filterPathTerminus; i < paths.length; i++) {
+               if (i > filterPathTerminus) {
+                  fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
+               }
+               fqqn.append(paths[i]);
+            }
+            fqqn.append(CompositeAddress.SEPARATOR);
+            // consumer queue
+            for (int i = 0; i < filterPathTerminus; i++) {
+               if (i > 0) {
+                  fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
+               }
+               fqqn.append(paths[i]);
+            }
+            mappedDestination = new ActiveMQQueue(fqqn.toString());
+            break;
+         }
+      }
+      if (mappedDestination == null) {
+         // cache the identity mapping
+         mappedDestination = destination;
+      }
+      synchronized (vtDestMapCache) {
+         ActiveMQDestination existing = vtDestMapCache.put(destination, mappedDestination);
+         if (existing != null) {
+            // some one beat us to the put, revert
+            vtDestMapCache.put(destination, existing);
+            mappedDestination = existing;
+         }
+      }
+      return mappedDestination;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82e4f465/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index b0eb678..d284d6c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -173,6 +173,7 @@ public class AMQSession implements SessionCallback {
             isInternalAddress = connection.isSuppressInternalManagementObjects();
          }
          if (openWireDest.isQueue()) {
+            openWireDest = protocolManager.virtualTopicConsumerToFQQN(openWireDest);
             SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
 
             if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82e4f465/docs/migration-guide/en/VirtualTopics.md
----------------------------------------------------------------------
diff --git a/docs/migration-guide/en/VirtualTopics.md b/docs/migration-guide/en/VirtualTopics.md
index e0775ad..0cf406d 100644
--- a/docs/migration-guide/en/VirtualTopics.md
+++ b/docs/migration-guide/en/VirtualTopics.md
@@ -11,12 +11,19 @@ component must be unique to a connection on the broker. This means that
the subs
 not possible to load balance the stream of messages across consumers and quick failover is
difficult because the
 existing connection state on the broker needs to be first disposed.
 With virtual topics, each subscription's stream of messages is redirected to a queue.
+
+In Artemis there are two alternatives, the new JMS 2.0 api or direct access to a subscription
queue via the FQQN.
  
-JMS2.0 adds the possibility of shared subscriptions with new API's that are fully supported
in Artemis.
+JMS 2.0 shared subscriptions
+----------------------------
+JMS 2.0 adds the possibility of shared subscriptions with new API's that are fully supported
in Artemis.
+
+Fully Qualified Queue name (FQQN)
+---------------------------------
 Secondly, Artemis uses a queue per topic subscriber model internally and it is possibly to
directly address the
 subscription queue using it's Fully Qualified Queue name (FQQN).
 
-For example, a default 5.x consumer for topic `VirtualTopic.Orders` subscription `A`:
+For example, a default 5.x consumer destination for topic `VirtualTopic.Orders` subscription
`A`:
 ```
     ...
     Queue subscriptionQueue = session.createQueue("Consumer.A.VirtualTopic.Orders");
@@ -30,6 +37,19 @@ would be replaced with an Artemis FQQN comprised of the address and queue.
     session.createConsumer(subscriptionQueue);
 ```
 
+This does require modification to the destination name used by consumers which is not ideal.
+If OpenWire clients cannot be modified, Artemis supports a virtual topic wildcard filter
+mechanism on the openwire protocol handler that will automatically convert the consumer destination
into the
+corresponding FQQN.
+The format is a comma separated list of strings pairs, delimited with a ';'. Each pair identifies
a filter to match
+the virtual topic consumer destination and an int that specifies the number of path matches
that terminate the consumer
+queue identity.
+
+E.g: For the default 5.x virtual topic consumer prefix of ```Consumer.*.``` the url parameter
```virtualTopicConsumerWildcards``` should be: ```Consumer.*.>;2```.
+In this way a consumer destination of ```Consumer.A.VirtualTopic.Orders``` will be transformed
into a FQQN of
+```VirtualTopic.Orders::Consumer.A```. 
+
+
 Durable topic subscribers in a network of brokers
 -------------------------------------------------
 The store and forward network bridges in 5.x create a durable subscriber per destination.
As demand migrates across a

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82e4f465/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md
index 6114de0..d7c22e3 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -204,6 +204,25 @@ The two parameters are configured on openwire acceptors, via URLs or
API. For ex
 
     <acceptor name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
 
+### Virtual Topic Consumer Destination Translation
+
+For existing Openwire consumers of virtual topic destinations it is possible to configure
a mapping function
+that will translate the virtual topic consumer destination into a FQQN address. This address
then represents
+the consumer as a multicast binding to an address representing the virtual topic. 
+
+The configuration string property ```virtualTopicConsumerWildcards``` has two parts seperated
by a ```;```. 
+The first is the 5.x style destination filter that identifies the destination as belonging
to a virtual topic.
+The second identifies the number of ```paths``` that identify the consumer queue such that
it can be parsed from the
+destination.
+For example, the default 5.x virtual topic with consumer prefix of ```Consumer.*.```, would
require a
+```virtualTopicConsumerWildcards``` filter of:
+
+     <acceptor name="artemis">tcp://127.0.0.1:61616?protocols=OPENWIRE;virtualTopicConsumerWildcards=Consumer.*.>;2</acceptor>
+
+This will translate ```Consumer.A.VirtualTopic.Orders``` into a FQQN of ```VirtualTopic.Orders::Consumer.A```
using the
+int component ```2``` of the configuration to identify the consumer queue as the first two
paths of the destination.
+```virtualTopicConsumerWildcards``` is multi valued using a ```,``` separator. 
+  
 ## MQTT
 
 MQTT is a light weight, client to server, publish / subscribe messaging protocol.  MQTT has
been specifically

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82e4f465/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java
new file mode 100644
index 0000000..85b5685
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
+import org.apache.activemq.artemis.core.server.impl.Activation;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.selector.impl.LRUCache;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class OpenWireProtocolManagerTest {
+
+   OpenWireProtocolManager underTest;
+   LRUCache lruCacheRef;
+
+   @Test
+   public void testVtAutoConversion() throws Exception {
+      underTest = new OpenWireProtocolManager(null, new DummyServer()) {
+         @Override
+         public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination)
{
+            if (lruCacheRef == null) {
+               lruCacheRef = vtDestMapCache;
+            }
+            return super.virtualTopicConsumerToFQQN(destination);
+         }
+      };
+
+      final int maxCacheSize = 10;
+      underTest.setVirtualTopicConsumerLruCacheMax(10);
+      underTest.setVirtualTopicConsumerWildcards("A.>;1,B.*.>;2,C.*.*.*.EE;3");
+
+      ActiveMQDestination A = new org.apache.activemq.command.ActiveMQQueue("A.SomeTopic");
+      assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic::A"), underTest.virtualTopicConsumerToFQQN(A));
+
+      ActiveMQDestination B = new org.apache.activemq.command.ActiveMQQueue("B.b.SomeTopic.B");
+      assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.B::B.b"), underTest.virtualTopicConsumerToFQQN(B));
+
+      ActiveMQDestination C = new org.apache.activemq.command.ActiveMQQueue("C.c.c.SomeTopic.EE");
+      assertEquals(new org.apache.activemq.command.ActiveMQQueue("SomeTopic.EE::C.c.c"),
underTest.virtualTopicConsumerToFQQN(C));
+
+      for (int i = 0; i < maxCacheSize; i++) {
+         ActiveMQDestination identity = new org.apache.activemq.command.ActiveMQQueue("Identity"
+ i);
+         assertEquals(identity, underTest.virtualTopicConsumerToFQQN(identity));
+      }
+
+      assertFalse(lruCacheRef.containsKey(A));
+   }
+
+   static final class DummyServer extends ActiveMQServerImpl {
+
+      @Override
+      public ClusterManager getClusterManager() {
+         return new ClusterManager(getExecutorFactory(), this, null, null, null, null, null,
false);
+      }
+
+      @Override
+      public ExecutorFactory getExecutorFactory() {
+         return new ExecutorFactory() {
+            @Override
+            public ArtemisExecutor getExecutor() {
+               return null;
+            }
+         };
+      }
+
+      @Override
+      public Activation getActivation() {
+         return new Activation() {
+            @Override
+            public void close(boolean permanently, boolean restarting) throws Exception {
+
+            }
+
+            @Override
+            public void run() {
+
+            }
+         };
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82e4f465/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
new file mode 100644
index 0000000..34d08bc
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VirtualTopicToFQQNOpenWireTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Set;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.junit.Test;
+
+public class VirtualTopicToFQQNOpenWireTest extends OpenWireTestBase {
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
+      for (TransportConfiguration tc : acceptors) {
+         if (tc.getName().equals("netty")) {
+            tc.getExtraParams().put("virtualTopicConsumerWildcards", "Consumer.*.>;2");
+            tc.getExtraParams().put("virtualTopicConsumerLruCacheMax", "10000");
+
+         }
+      }
+   }
+
+   @Test
+   public void testAutoVirtualTopicFQQN() throws Exception {
+      Connection connection = null;
+
+      SimpleString topic = new SimpleString("VirtualTopic.Orders");
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
+      this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
+
+      try {
+         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
+         activeMQConnectionFactory.setWatchTopicAdvisories(false);
+         connection = activeMQConnectionFactory.createConnection();
+         connection.start();
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createTopic(topic.toString());
+
+         MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A."
+ topic.toString()));
+         MessageConsumer messageConsumerB = session.createConsumer(session.createQueue("Consumer.B."
+ topic.toString()));
+
+         MessageProducer producer = session.createProducer(destination);
+         TextMessage message = session.createTextMessage("This is a text message");
+         producer.send(message);
+
+         TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
+         TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
+
+         assertTrue((messageReceivedA != null && messageReceivedB != null));
+         String text = messageReceivedA.getText();
+         assertEquals("This is a text message", text);
+
+         messageConsumerA.close();
+         messageConsumerB.close();
+
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+}


Mime
View raw message