This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 7096bc1 ARTEMIS-2649 always over-write ORIG message props
new 2d08e45 This closes #3153
7096bc1 is described below
commit 7096bc187a0182a597122ed4011397d78017014b
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Fri Mar 13 08:38:01 2020 -0500
ARTEMIS-2649 always over-write ORIG message props
ORIG message propertes like _AMQ_ORIG_ADDRESS are added to messages
during various broker operations (e.g. diverting a message, expiring a
message, etc.). However, if multiple operations try to set these
properties on the same message (e.g. administratively moving a message
which eventually gets sent to a dead-letter address) then important
details can be lost. This is particularly problematic when using
auto-created dead-letter or expiry resources which use filters based on
_AMQ_ORIG_ADDRESS and can lead to message loss.
This commit simply over-writes the existing ORIG properties rather than
preserving them so that the most recent information is available.
---
.../apache/activemq/artemis/api/core/Message.java | 22 +-------
.../artemis/core/server/ActiveMQServerLogger.java | 5 ++
.../artemis/core/server/impl/QueueImpl.java | 14 ++++-
docs/user-manual/en/copied-message-properties.md | 37 ++++++++++++
docs/user-manual/en/diverts.md | 7 +--
docs/user-manual/en/message-expiry.md | 35 ++++--------
docs/user-manual/en/undelivered-messages.md | 34 ++++-------
.../amqp/DLQAfterExpiredMessageTest.java | 8 +--
.../tests/integration/client/LargeMessageTest.java | 58 ++++++++++++++++++-
.../tests/integration/divert/DivertTest.java | 36 +++++++++++-
.../integration/management/QueueControlTest.java | 41 ++++++++++++++
.../server/AutoCreateDeadLetterResourcesTest.java | 66 +++++++++++++++++++++-
12 files changed, 279 insertions(+), 84 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b8f6303..fdb5188 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -463,25 +463,9 @@ public interface Message {
}
default void referenceOriginalMessage(final Message original, String originalQueue) {
- Object queueOnMessage = original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE);
-
- if (queueOnMessage != null) {
- setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
- } else if (originalQueue != null) {
- setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
- }
-
- Object originalID = original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID);
-
- if (originalID != null) {
- setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS));
-
- setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID);
- } else {
- setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
-
- setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
- }
+ setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
+ setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
+ setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
// reset expiry
setExpiration(0);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 99ab6ad..9369696 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1674,6 +1674,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void pageLookupError(int pageNr, int messageNr, int offset, int startNr);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222289, value = "Did not route to any matching bindings on dead-letter-address
{0} and auto-create-dead-letter-resources is true; dropping message: {1}",
+ format = Message.Format.MESSAGE_FORMAT)
+ void noMatchingBindingsOnDLAWithAutoCreateDLAResources(SimpleString address, String message);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0caf7ad..84a6140 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
@@ -3477,7 +3478,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
ref.acknowledge(tx, AckReason.KILLED, null);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress,
name);
- move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
+ RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED,
null);
+
+ // this shouldn't happen, but in case it does it's better to log a message than
just drop the message silently
+ if (status.equals(RoutingStatus.NO_BINDINGS) && server.getAddressSettingsRepository().getMatch(getAddress().toString()).isAutoCreateDeadLetterResources())
{
+ ActiveMQServerLogger.LOGGER.noMatchingBindingsOnDLAWithAutoCreateDLAResources(deadLetterAddress,
ref.toString());
+ }
return true;
}
} else {
@@ -3511,7 +3517,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
}
}
- private void move(final Transaction originalTX,
+ private RoutingStatus move(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref,
@@ -3531,13 +3537,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
copyMessage.setAddress(address);
- postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
+ RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate,
binding);
acknowledge(tx, ref, reason, consumer);
if (originalTX == null) {
tx.commit();
}
+
+ return routingStatus;
}
/*
diff --git a/docs/user-manual/en/copied-message-properties.md b/docs/user-manual/en/copied-message-properties.md
new file mode 100644
index 0000000..1063c4b
--- /dev/null
+++ b/docs/user-manual/en/copied-message-properties.md
@@ -0,0 +1,37 @@
+# Properties for Copied Messages
+
+There are several operations within the broker that result in copying a
+message. These include:
+
+- Diverting a message from one address to another.
+- Moving an expired message from a queue to the configured `expiry-address`
+- Moving a message which has exceeded its `max-delivery-attempts` from a queue
+ to the configured `dead-letter-address`
+- Using the management API to administratively move messages from one queue to
+ another
+
+When this happens the body and properties of the original message are copied to
+a new message. However, the copying process removes some potentially important
+pieces of data so those are preserved in the following special message
+properties:
+
+- `_AMQ_ORIG_ADDRESS`
+
+ a String property containing the *original address* of the message
+
+- `_AMQ_ORIG_QUEUE`
+
+ a String property containing the *original queue* of the message
+
+- `_AMQ_ORIG_MESSAGE_ID`
+
+ a String property containing the *original message ID* of the message
+
+It's possible for the aforementioned operations to be combined. For example, a
+message may be diverted from one address to another where it lands in a queue
+and a consumer tries & fails to consume it such that the message is then sent
+to a dead-letter address. Or a message may be administratively moved from one
+queue to another where it then expires.
+
+In cases like these the `ORIG` properties will contain the information from the
+_last_ (i.e. most recent) operation.
\ No newline at end of file
diff --git a/docs/user-manual/en/diverts.md b/docs/user-manual/en/diverts.md
index a753707..1025a82 100644
--- a/docs/user-manual/en/diverts.md
+++ b/docs/user-manual/en/diverts.md
@@ -57,12 +57,7 @@ geographically distributed servers, creating your global messaging mesh.
Diverts are defined as xml in the `broker.xml` file at the `core` attribute level.
There can be zero or more diverts in the file.
-Diverted message gets a new message ID, and its address is set to a forward
-address. To access original values, use message properties: original destination
-is stored in a String property `_AMQ_ORIG_ADDRESS` (`Message.HDR_ORIGINAL_ADDRESS`
-constant from the Core API), and the original message ID in a Long property
-`_AMQ_ORIG_MESSAGE_ID` (`Message.HDR_ORIG_MESSAGE_ID` constant from the
-Core API).
+Diverted messages get [special properties](copied-message-properties.md).
Please see the examples for a full working example showing you how to
configure and use diverts.
diff --git a/docs/user-manual/en/message-expiry.md b/docs/user-manual/en/message-expiry.md
index 2d8e198..1003fd2 100644
--- a/docs/user-manual/en/message-expiry.md
+++ b/docs/user-manual/en/message-expiry.md
@@ -28,18 +28,8 @@ JMS MessageProducer allows to set a TimeToLive for the messages it sent:
producer.setTimeToLive(5000);
```
-Expired messages which are consumed from an expiry address have the following
-properties:
-
-- `_AMQ_ORIG_ADDRESS`
-
- a String property containing the *original address* of the expired
- message
-
-- `_AMQ_ORIG_QUEUE`
-
- a String property containing the *original queue* of the expired
- message
+Expired messages get [special properties](copied-message-properties.md) plus this
+additional property:
- `_AMQ_ACTUAL_EXPIRY`
@@ -123,21 +113,20 @@ an `address-setting` to configure the `expiry-address` much less
the actual `address` and `queue` to hold the expired messages.
The solution to this problem is to set the `auto-create-expiry-resources`
-`address-setting` to `true` (it's `false` by default) so that the
-broker will create the `address` and `queue` to deal with the
-expired messages automatically. The `address` created will be the
-one defined by the `expiry-address`. A `MULTICAST` `queue` will be
-created on that `address`. It will be named by the `address` to which
-the message was originally sent, and it will have a filter defined using
-the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
-receive messages sent to the relevant `address`. The `queue` name can be
-configured with a prefix and suffix. See the relevant settings in the
-table below:
+`address-setting` to `true` (it's `false` by default) so that the broker will
+create the `address` and `queue` to deal with the expired messages
+automatically. The `address` created will be the one defined by the
+`expiry-address`. A `MULTICAST` `queue` will be created on that `address`.
+It will be named by the `address` to which the message was previously sent, and
+it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
+will only receive messages sent to the relevant `address`. The `queue` name can
+be configured with a prefix and suffix. See the relevant settings in the table
+below:
`address-setting`|default
---|---
`expiry-queue-prefix`|`EXP.`
-`expiry-queue-suffix`|`` (empty string)
+`expiry-queue-suffix`|(empty string)
Here is an example configuration:
diff --git a/docs/user-manual/en/undelivered-messages.md b/docs/user-manual/en/undelivered-messages.md
index 97f85f7..b4d1392 100644
--- a/docs/user-manual/en/undelivered-messages.md
+++ b/docs/user-manual/en/undelivered-messages.md
@@ -165,18 +165,7 @@ set of addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)).
### Dead Letter Properties
-Dead letter messages which are consumed from a dead letter address have
-the following properties:
-
-- `_AMQ_ORIG_ADDRESS`
-
- a String property containing the *original address* of the dead
- letter message
-
-- `_AMQ_ORIG_QUEUE`
-
- a String property containing the *original queue* of the dead letter
- message
+Dead letter messages get [special properties](copied-message-properties.md).
### Automatically Creating Dead Letter Resources
@@ -194,21 +183,20 @@ an `address-setting` to configure the `dead-letter-address` much less
the actual `address` and `queue` to hold the undelivered messages.
The solution to this problem is to set the `auto-create-dead-letter-resources`
-`address-setting` to `true` (it's `false` by default) so that the
-broker will create the `address` and `queue` to deal with the
-undelivered messages automatically. The `address` created will be the
-one defined by the `dead-letter-address`. A `MULTICAST` `queue` will be
-created on that `address`. It will be named by the `address` to which
-the message was originally sent, and it will have a filter defined using
-the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
-receive messages sent to the relevant `address`. The `queue` name can be
-configured with a prefix and suffix. See the relevant settings in the
-table below:
+`address-setting` to `true` (it's `false` by default) so that the broker will
+create the `address` and `queue` to deal with the undelivered messages
+automatically. The `address` created will be the one defined by the
+`dead-letter-address`. A `MULTICAST` `queue` will be created on that `address`.
+It will be named by the `address` to which the message was previously sent, and
+it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
+ will only receive messages sent to the relevant `address`. The `queue` name
+ can be configured with a prefix and suffix. See the relevant settings in the
+ table below:
`address-setting`|default
---|---
`dead-letter-queue-prefix`|`DLQ.`
-`dead-letter-queue-suffix`|`` (empty string)
+`dead-letter-queue-suffix`|(empty string)
Here is an example configuration:
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
index ce8fd56..09446eb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
@@ -137,18 +137,12 @@ public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport
{
// Redo the selection
- receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION
+ "\"='" + getQueueName() + "'");
+ receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION
+ "\"='" + getExpiryQueue() + "'");
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
received.accept();
- /** When moving to DLQ, the original headers shoudln't be touched. */
- for (Map.Entry<String, Object> entry : annotations.entrySet()) {
- log.debug("Checking " + entry.getKey() + " = " + entry.getValue());
- Assert.assertEquals(entry.getKey() + " should be = " + entry.getValue(), entry.getValue(),
received.getMessageAnnotation(entry.getKey()));
- }
-
assertEquals(0, received.getTimeToLive());
assertNotNull(received);
assertEquals("Value1", received.getApplicationProperty("key1"));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 745a6f7..b0da405 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@@ -62,7 +63,6 @@ import org.junit.Test;
public class LargeMessageTest extends LargeMessageTestBase {
-
private static final Logger log = Logger.getLogger(LargeMessageTest.class);
private static final int RECEIVE_WAIT_TIME = 10000;
@@ -229,6 +229,62 @@ public class LargeMessageTest extends LargeMessageTestBase {
}
@Test
+ public void testDivertAndExpire() throws Exception {
+ final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+ final String DIVERTED = "diverted";
+
+ ClientSession session = null;
+
+ ActiveMQServer server = createServer(true, isNetty(), storeType);
+ server.getConfiguration().setMessageExpiryScanPeriod(100);
+
+ server.start();
+
+ server.createQueue(new QueueConfiguration(DIVERTED));
+
+ server.getAddressSettingsRepository().addMatch(DIVERTED, new AddressSettings().setExpiryDelay(250L).setExpiryAddress(SimpleString.toSimpleString(DIVERTED
+ "Expiry")).setAutoCreateExpiryResources(true));
+
+ server.deployDivert(new DivertConfiguration().setName("myDivert").setAddress(ADDRESS.toString()).setForwardingAddress(DIVERTED).setExclusive(true));
+
+ ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+ session = addClientSession(sf.createSession(false, false, false));
+
+ session.createQueue(new QueueConfiguration(ADDRESS).setDurable(false).setTemporary(true));
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ Wait.waitFor(() -> server.locateQueue(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX
+ DIVERTED) != null, 1000, 100);
+
+ ClientConsumer consumer = session.createConsumer(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX
+ DIVERTED);
+ ClientMessage msg1 = consumer.receive(1000);
+ msg1.acknowledge();
+ session.commit();
+ Assert.assertNotNull(msg1);
+
+ consumer.close();
+
+ try {
+ msg1.getBodyBuffer().readByte();
+ Assert.fail("Exception was expected");
+ } catch (final Exception ignored) {
+ // empty on purpose
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+
+ @Test
public void testDeleteOnNoBinding() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 3ee1049..29b94a1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -485,7 +485,7 @@ public class DivertTest extends ActiveMQTestBase {
instanceLog.debug("Received message " + message);
assertNotNull(message);
- if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("divert1")) {
+ if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue1")) {
countOriginal1++;
} else if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue2"))
{
countOriginal2++;
@@ -1465,4 +1465,38 @@ public class DivertTest extends ActiveMQTestBase {
server.destroyDivert(SimpleString.toSimpleString(DIVERT));
assertNull(serviceRegistry.getDivertTransformer(DIVERT, null));
}
+
+ @Test
+ public void testProperties() throws Exception {
+ final String testAddress = "testAddress";
+ final SimpleString queue = SimpleString.toSimpleString("queue");
+ final int COUNT = 25;
+
+ ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(),
false));
+ server.start();
+
+ server.createQueue(new QueueConfiguration(queue).setAddress(testAddress + (COUNT)).setRoutingType(RoutingType.ANYCAST));
+ for (int i = 0; i < COUNT; i++) {
+ server.deployDivert(new DivertConfiguration()
+ .setName("divert" + i)
+ .setAddress(testAddress + i)
+ .setForwardingAddress(testAddress + (i + 1)));
+ }
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, true, true);
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
+ ClientConsumer consumer1 = session.createConsumer(queue);
+ ClientMessage message = session.createMessage(false);
+ producer.send(message);
+
+ message = consumer1.receive(DivertTest.TIMEOUT);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ Assert.assertEquals("testAddress" + COUNT, message.getAddress());
+ Assert.assertEquals("testAddress" + (COUNT - 1), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+ }
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 383a429..4cd8289 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1685,6 +1685,47 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
+ @Test
+ public void testCopiedMessageProperties() throws Exception {
+ final String testAddress = "testAddress";
+ final SimpleString queue = SimpleString.toSimpleString("queue");
+ final int COUNT = 5;
+
+ for (int i = 0; i < COUNT; i++) {
+ server.createQueue(new QueueConfiguration(queue.concat(Integer.toString(i))).setAddress(testAddress
+ i).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, true, true);
+ session.start();
+
+ ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
+ ClientMessage message = session.createMessage(durable);
+ producer.send(message);
+ producer.close();
+
+ for (int i = 0; i < COUNT - 1; i++) {
+ QueueControl queueControl = createManagementControl(SimpleString.toSimpleString(testAddress
+ i), queue.concat(Integer.toString(i)), RoutingType.ANYCAST);
+ QueueControl otherQueueControl = createManagementControl(SimpleString.toSimpleString(testAddress
+ (i + 1)), queue.concat(Integer.toString(i + 1)), RoutingType.ANYCAST);
+ assertMessageMetrics(queueControl, 1, durable);
+ assertMessageMetrics(otherQueueControl, 0, durable);
+
+ int moved = queueControl.moveMessages(null, queue.concat(Integer.toString(i + 1)).toString());
+ Assert.assertEquals(1, moved);
+ assertMessageMetrics(queueControl, 0, durable);
+ assertMessageMetrics(otherQueueControl, 1, durable);
+ }
+
+ ClientConsumer consumer1 = session.createConsumer(queue.concat(Integer.toString(COUNT
- 1)));
+ message = consumer1.receive(1000);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ System.out.println(message);
+ Assert.assertEquals(testAddress + (COUNT - 1), message.getAddress());
+ Assert.assertEquals(testAddress + (COUNT - 2), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+ }
+
/**
* <ol>
* <li>send 2 message to queue</li>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
index 80df901..484b83a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -196,8 +197,71 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase
{
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
}
- private void triggerDlaDelivery() throws Exception {
+ @Test
+ public void testDivertedMessage() throws Exception {
+ SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
+ String divertAddress = "divertAddress";
+
+ server.deployDivert(new DivertConfiguration().setName("testDivert").setAddress(divertAddress).setForwardingAddress(addressA.toString()));
+
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sessionFactory = createSessionFactory(locator);
+ ClientSession session = addClientSession(sessionFactory.createSession(true, true));
+ ClientProducer producer = addClientProducer(session.createProducer(divertAddress));
+ producer.send(session.createMessage(true));
+ producer.close();
+
+ Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000,
100);
+
+ triggerDlaDelivery();
+
+ Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000,
100);
+
+ ClientConsumer consumer = session.createConsumer(dlqName);
+ session.start();
+ ClientMessage message = consumer.receive(1000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ @Test
+ public void testMovedMessage() throws Exception {
+ SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
+ final SimpleString moveFromAddress = new SimpleString("moveFromAddress");
+ final SimpleString moveFromQueue = new SimpleString("moveFromQueue");
+ server.createQueue(new QueueConfiguration(moveFromQueue).setAddress(moveFromAddress).setRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sessionFactory = createSessionFactory(locator);
+ ClientSession session = addClientSession(sessionFactory.createSession(true, true));
+ ClientProducer producer = addClientProducer(session.createProducer(moveFromAddress));
+ producer.send(session.createMessage(true));
+ producer.close();
+
+ server.locateQueue(moveFromQueue).moveReferences(null, addressA, null);
+
+ Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000,
100);
+
+ triggerDlaDelivery();
+
+ Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000,
100);
+
+ ClientConsumer consumer = session.createConsumer(dlqName);
+ session.start();
+ ClientMessage message = consumer.receive(1000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ private void triggerDlaDelivery() throws Exception {
+ try {
+ server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
+ } catch (Exception e) {
+ // ignore
+ }
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, false));
|