activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2649 always over-write ORIG message props
Date Mon, 01 Jun 2020 19:19:45 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7096bc1  ARTEMIS-2649 always over-write ORIG message props
     new 2d08e45  This closes #3153
7096bc1 is described below

commit 7096bc187a0182a597122ed4011397d78017014b
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Fri Mar 13 08:38:01 2020 -0500

    ARTEMIS-2649 always over-write ORIG message props
    
    ORIG message propertes like _AMQ_ORIG_ADDRESS are added to messages
    during various broker operations (e.g. diverting a message, expiring a
    message, etc.). However, if multiple operations try to set these
    properties on the same message (e.g. administratively moving a message
    which eventually gets sent to a dead-letter address) then important
    details can be lost. This is particularly problematic when using
    auto-created dead-letter or expiry resources which use filters based on
    _AMQ_ORIG_ADDRESS and can lead to message loss.
    
    This commit simply over-writes the existing ORIG properties rather than
    preserving them so that the most recent information is available.
---
 .../apache/activemq/artemis/api/core/Message.java  | 22 +-------
 .../artemis/core/server/ActiveMQServerLogger.java  |  5 ++
 .../artemis/core/server/impl/QueueImpl.java        | 14 ++++-
 docs/user-manual/en/copied-message-properties.md   | 37 ++++++++++++
 docs/user-manual/en/diverts.md                     |  7 +--
 docs/user-manual/en/message-expiry.md              | 35 ++++--------
 docs/user-manual/en/undelivered-messages.md        | 34 ++++-------
 .../amqp/DLQAfterExpiredMessageTest.java           |  8 +--
 .../tests/integration/client/LargeMessageTest.java | 58 ++++++++++++++++++-
 .../tests/integration/divert/DivertTest.java       | 36 +++++++++++-
 .../integration/management/QueueControlTest.java   | 41 ++++++++++++++
 .../server/AutoCreateDeadLetterResourcesTest.java  | 66 +++++++++++++++++++++-
 12 files changed, 279 insertions(+), 84 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b8f6303..fdb5188 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -463,25 +463,9 @@ public interface Message {
    }
 
    default void referenceOriginalMessage(final Message original, String originalQueue) {
-      Object queueOnMessage = original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE);
-
-      if (queueOnMessage != null) {
-         setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
-      } else if (originalQueue != null) {
-         setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
-      }
-
-      Object originalID = original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID);
-
-      if (originalID != null) {
-         setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS));
-
-         setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID);
-      } else {
-         setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
-
-         setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
-      }
+      setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
+      setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
+      setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
 
       // reset expiry
       setExpiration(0);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 99ab6ad..9369696 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1674,6 +1674,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void pageLookupError(int pageNr, int messageNr, int offset, int startNr);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222289, value = "Did not route to any matching bindings on dead-letter-address
{0} and auto-create-dead-letter-resources is true; dropping message: {1}",
+      format = Message.Format.MESSAGE_FORMAT)
+   void noMatchingBindingsOnDLAWithAutoCreateDLAResources(SimpleString address, String message);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0caf7ad..84a6140 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
@@ -3477,7 +3478,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
             ref.acknowledge(tx, AckReason.KILLED, null);
          } else {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress,
name);
-            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
+            RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED,
null);
+
+            // this shouldn't happen, but in case it does it's better to log a message than
just drop the message silently
+            if (status.equals(RoutingStatus.NO_BINDINGS) && server.getAddressSettingsRepository().getMatch(getAddress().toString()).isAutoCreateDeadLetterResources())
{
+               ActiveMQServerLogger.LOGGER.noMatchingBindingsOnDLAWithAutoCreateDLAResources(deadLetterAddress,
ref.toString());
+            }
             return true;
          }
       } else {
@@ -3511,7 +3517,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       }
    }
 
-   private void move(final Transaction originalTX,
+   private RoutingStatus move(final Transaction originalTX,
                      final SimpleString address,
                      final Binding binding,
                      final MessageReference ref,
@@ -3531,13 +3537,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
       copyMessage.setAddress(address);
 
-      postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
+      RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate,
binding);
 
       acknowledge(tx, ref, reason, consumer);
 
       if (originalTX == null) {
          tx.commit();
       }
+
+      return routingStatus;
    }
 
    /*
diff --git a/docs/user-manual/en/copied-message-properties.md b/docs/user-manual/en/copied-message-properties.md
new file mode 100644
index 0000000..1063c4b
--- /dev/null
+++ b/docs/user-manual/en/copied-message-properties.md
@@ -0,0 +1,37 @@
+# Properties for Copied Messages
+
+There are several operations within the broker that result in copying a
+message. These include:
+
+- Diverting a message from one address to another.
+- Moving an expired message from a queue to the configured `expiry-address`
+- Moving a message which has exceeded its `max-delivery-attempts` from a queue
+  to the configured `dead-letter-address`
+- Using the management API to administratively move messages from one queue to
+  another
+
+When this happens the body and properties of the original message are copied to
+a new message. However, the copying process removes some potentially important
+pieces of data so those are preserved in the following special message
+properties:
+
+- `_AMQ_ORIG_ADDRESS`
+
+  a String property containing the *original address* of the message
+
+- `_AMQ_ORIG_QUEUE`
+
+  a String property containing the *original queue* of the message
+
+- `_AMQ_ORIG_MESSAGE_ID`
+
+  a String property containing the *original message ID* of the message
+
+It's possible for the aforementioned operations to be combined. For example, a
+message may be diverted from one address to another where it lands in a queue
+and a consumer tries & fails to consume it such that the message is then sent
+to a dead-letter address. Or a message may be administratively moved from one
+queue to another where it then expires.
+
+In cases like these the `ORIG` properties will contain the information from the
+_last_ (i.e. most recent) operation.
\ No newline at end of file
diff --git a/docs/user-manual/en/diverts.md b/docs/user-manual/en/diverts.md
index a753707..1025a82 100644
--- a/docs/user-manual/en/diverts.md
+++ b/docs/user-manual/en/diverts.md
@@ -57,12 +57,7 @@ geographically distributed servers, creating your global messaging mesh.
 Diverts are defined as xml in the `broker.xml` file at the `core` attribute level.
 There can be zero or more diverts in the file.
 
-Diverted message gets a new message ID, and its address is set to a forward
-address. To access original values, use message properties: original destination
-is stored in a String property `_AMQ_ORIG_ADDRESS` (`Message.HDR_ORIGINAL_ADDRESS`
-constant from the Core API), and the original message ID in a Long property
-`_AMQ_ORIG_MESSAGE_ID` (`Message.HDR_ORIG_MESSAGE_ID` constant from the
-Core API).
+Diverted messages get [special properties](copied-message-properties.md).
 
 Please see the examples for a full working example showing you how to
 configure and use diverts.
diff --git a/docs/user-manual/en/message-expiry.md b/docs/user-manual/en/message-expiry.md
index 2d8e198..1003fd2 100644
--- a/docs/user-manual/en/message-expiry.md
+++ b/docs/user-manual/en/message-expiry.md
@@ -28,18 +28,8 @@ JMS MessageProducer allows to set a TimeToLive for the messages it sent:
 producer.setTimeToLive(5000);
 ```
 
-Expired messages which are consumed from an expiry address have the following
-properties:
-
-- `_AMQ_ORIG_ADDRESS`
-
-  a String property containing the *original address* of the expired
-  message
-
-- `_AMQ_ORIG_QUEUE`
-
-  a String property containing the *original queue* of the expired
-  message
+Expired messages get [special properties](copied-message-properties.md) plus this
+additional property:
 
 - `_AMQ_ACTUAL_EXPIRY`
 
@@ -123,21 +113,20 @@ an `address-setting` to configure the `expiry-address` much less
 the actual `address` and `queue` to hold the expired messages.
 
 The solution to this problem is to set the `auto-create-expiry-resources`
-`address-setting` to `true` (it's `false` by default) so that the
-broker will create the `address` and `queue` to deal with the
-expired messages automatically. The `address` created will be the
-one defined by the `expiry-address`. A `MULTICAST` `queue` will be
-created on that `address`. It will be named by the `address` to which
-the message was originally sent, and it will have a filter defined using
-the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
-receive messages sent to the relevant `address`. The `queue` name can be
-configured with a prefix and suffix. See the relevant settings in the
-table below:
+`address-setting` to `true` (it's `false` by default) so that the broker will
+create the `address` and `queue` to deal with the expired messages
+automatically. The `address` created will be the one defined by the
+`expiry-address`. A `MULTICAST` `queue` will be created on that `address`.
+It will be named by the `address` to which the message was previously sent, and
+it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
+will only receive messages sent to the relevant `address`. The `queue` name can
+be configured with a prefix and suffix. See the relevant settings in the table
+below:
 
 `address-setting`|default
 ---|---
 `expiry-queue-prefix`|`EXP.`
-`expiry-queue-suffix`|`` (empty string)
+`expiry-queue-suffix`|(empty string)
 
 Here is an example configuration:
 
diff --git a/docs/user-manual/en/undelivered-messages.md b/docs/user-manual/en/undelivered-messages.md
index 97f85f7..b4d1392 100644
--- a/docs/user-manual/en/undelivered-messages.md
+++ b/docs/user-manual/en/undelivered-messages.md
@@ -165,18 +165,7 @@ set of addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)).
 
 ### Dead Letter Properties
 
-Dead letter messages which are consumed from a dead letter address have
-the following properties:
-
-- `_AMQ_ORIG_ADDRESS`
-
-  a String property containing the *original address* of the dead
-  letter message
-
-- `_AMQ_ORIG_QUEUE`
-
-  a String property containing the *original queue* of the dead letter
-  message
+Dead letter messages get [special properties](copied-message-properties.md).
 
 ### Automatically Creating Dead Letter Resources
 
@@ -194,21 +183,20 @@ an `address-setting` to configure the `dead-letter-address` much less
 the actual `address` and `queue` to hold the undelivered messages.
 
 The solution to this problem is to set the `auto-create-dead-letter-resources`
-`address-setting` to `true` (it's `false` by default) so that the
-broker will create the `address` and `queue` to deal with the
-undelivered messages automatically. The `address` created will be the
-one defined by the `dead-letter-address`. A `MULTICAST` `queue` will be
-created on that `address`. It will be named by the `address` to which
-the message was originally sent, and it will have a filter defined using
-the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
-receive messages sent to the relevant `address`. The `queue` name can be
-configured with a prefix and suffix. See the relevant settings in the
-table below:
+`address-setting` to `true` (it's `false` by default) so that the broker will
+create the `address` and `queue` to deal with the undelivered messages
+automatically. The `address` created will be the one defined by the
+`dead-letter-address`. A `MULTICAST` `queue` will be created on that `address`.
+It will be named by the `address` to which the message was previously sent, and
+it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
+ will only receive messages sent to the relevant `address`. The `queue` name
+ can be configured with a prefix and suffix. See the relevant settings in the
+ table below:
 
 `address-setting`|default
 ---|---
 `dead-letter-queue-prefix`|`DLQ.`
-`dead-letter-queue-suffix`|`` (empty string)
+`dead-letter-queue-suffix`|(empty string)
 
 Here is an example configuration:
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
index ce8fd56..09446eb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java
@@ -137,18 +137,12 @@ public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport
{
 
 
          // Redo the selection
-         receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION
+ "\"='" + getQueueName() + "'");
+         receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION
+ "\"='" + getExpiryQueue() + "'");
          receiverDLQ.flow(1);
          received = receiverDLQ.receive(5, TimeUnit.SECONDS);
          Assert.assertNotNull(received);
          received.accept();
 
-         /** When moving to DLQ, the original headers shoudln't be touched. */
-         for (Map.Entry<String, Object> entry : annotations.entrySet()) {
-            log.debug("Checking " + entry.getKey() + " = " + entry.getValue());
-            Assert.assertEquals(entry.getKey() + " should be = " + entry.getValue(), entry.getValue(),
received.getMessageAnnotation(entry.getKey()));
-         }
-
          assertEquals(0, received.getTimeToLive());
          assertNotNull(received);
          assertEquals("Value1", received.getApplicationProperty("key1"));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 745a6f7..b0da405 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@@ -62,7 +63,6 @@ import org.junit.Test;
 
 public class LargeMessageTest extends LargeMessageTestBase {
 
-
    private static final Logger log = Logger.getLogger(LargeMessageTest.class);
 
    private static final int RECEIVE_WAIT_TIME = 10000;
@@ -229,6 +229,62 @@ public class LargeMessageTest extends LargeMessageTestBase {
    }
 
    @Test
+   public void testDivertAndExpire() throws Exception {
+      final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+      final String DIVERTED = "diverted";
+
+      ClientSession session = null;
+
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+      server.getConfiguration().setMessageExpiryScanPeriod(100);
+
+      server.start();
+
+      server.createQueue(new QueueConfiguration(DIVERTED));
+
+      server.getAddressSettingsRepository().addMatch(DIVERTED, new AddressSettings().setExpiryDelay(250L).setExpiryAddress(SimpleString.toSimpleString(DIVERTED
+ "Expiry")).setAutoCreateExpiryResources(true));
+
+      server.deployDivert(new DivertConfiguration().setName("myDivert").setAddress(ADDRESS.toString()).setForwardingAddress(DIVERTED).setExclusive(true));
+
+      ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+      session = addClientSession(sf.createSession(false, false, false));
+
+      session.createQueue(new QueueConfiguration(ADDRESS).setDurable(false).setTemporary(true));
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
+
+      producer.send(clientFile);
+
+      session.commit();
+
+      session.start();
+
+      Wait.waitFor(() -> server.locateQueue(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX
+ DIVERTED) != null, 1000, 100);
+
+      ClientConsumer consumer = session.createConsumer(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX
+ DIVERTED);
+      ClientMessage msg1 = consumer.receive(1000);
+      msg1.acknowledge();
+      session.commit();
+      Assert.assertNotNull(msg1);
+
+      consumer.close();
+
+      try {
+         msg1.getBodyBuffer().readByte();
+         Assert.fail("Exception was expected");
+      } catch (final Exception ignored) {
+         // empty on purpose
+      }
+
+      session.close();
+
+      validateNoFilesOnLargeDir();
+   }
+
+   @Test
    public void testDeleteOnNoBinding() throws Exception {
       final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 3ee1049..29b94a1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -485,7 +485,7 @@ public class DivertTest extends ActiveMQTestBase {
          instanceLog.debug("Received message " + message);
          assertNotNull(message);
 
-         if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("divert1")) {
+         if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue1")) {
             countOriginal1++;
          } else if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue2"))
{
             countOriginal2++;
@@ -1465,4 +1465,38 @@ public class DivertTest extends ActiveMQTestBase {
       server.destroyDivert(SimpleString.toSimpleString(DIVERT));
       assertNull(serviceRegistry.getDivertTransformer(DIVERT, null));
    }
+
+   @Test
+   public void testProperties() throws Exception {
+      final String testAddress = "testAddress";
+      final SimpleString queue = SimpleString.toSimpleString("queue");
+      final int COUNT = 25;
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(),
false));
+      server.start();
+
+      server.createQueue(new QueueConfiguration(queue).setAddress(testAddress + (COUNT)).setRoutingType(RoutingType.ANYCAST));
+      for (int i = 0; i < COUNT; i++) {
+         server.deployDivert(new DivertConfiguration()
+                                .setName("divert" + i)
+                                .setAddress(testAddress + i)
+                                .setForwardingAddress(testAddress + (i + 1)));
+      }
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, true, true);
+      session.start();
+
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
+      ClientConsumer consumer1 = session.createConsumer(queue);
+      ClientMessage message = session.createMessage(false);
+      producer.send(message);
+
+      message = consumer1.receive(DivertTest.TIMEOUT);
+      Assert.assertNotNull(message);
+      message.acknowledge();
+      Assert.assertEquals("testAddress" + COUNT, message.getAddress());
+      Assert.assertEquals("testAddress" + (COUNT - 1), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+   }
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 383a429..4cd8289 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1685,6 +1685,47 @@ public class QueueControlTest extends ManagementTestBase {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testCopiedMessageProperties() throws Exception {
+      final String testAddress = "testAddress";
+      final SimpleString queue = SimpleString.toSimpleString("queue");
+      final int COUNT = 5;
+
+      for (int i = 0; i < COUNT; i++) {
+         server.createQueue(new QueueConfiguration(queue.concat(Integer.toString(i))).setAddress(testAddress
+ i).setRoutingType(RoutingType.ANYCAST));
+      }
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, true, true);
+      session.start();
+
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
+      ClientMessage message = session.createMessage(durable);
+      producer.send(message);
+      producer.close();
+
+      for (int i = 0; i < COUNT - 1; i++) {
+         QueueControl queueControl = createManagementControl(SimpleString.toSimpleString(testAddress
+ i), queue.concat(Integer.toString(i)), RoutingType.ANYCAST);
+         QueueControl otherQueueControl = createManagementControl(SimpleString.toSimpleString(testAddress
+ (i + 1)), queue.concat(Integer.toString(i + 1)), RoutingType.ANYCAST);
+         assertMessageMetrics(queueControl, 1, durable);
+         assertMessageMetrics(otherQueueControl, 0, durable);
+
+         int moved = queueControl.moveMessages(null, queue.concat(Integer.toString(i + 1)).toString());
+         Assert.assertEquals(1, moved);
+         assertMessageMetrics(queueControl, 0, durable);
+         assertMessageMetrics(otherQueueControl, 1, durable);
+      }
+
+      ClientConsumer consumer1 = session.createConsumer(queue.concat(Integer.toString(COUNT
- 1)));
+      message = consumer1.receive(1000);
+      Assert.assertNotNull(message);
+      message.acknowledge();
+      System.out.println(message);
+      Assert.assertEquals(testAddress + (COUNT - 1), message.getAddress());
+      Assert.assertEquals(testAddress + (COUNT - 2), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+   }
+
    /**
     * <ol>
     * <li>send 2 message to queue</li>
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
index 80df901..484b83a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AutoCreateDeadLetterResourcesTest.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -196,8 +197,71 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase
{
       assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
    }
 
-   private void triggerDlaDelivery() throws Exception {
+   @Test
+   public void testDivertedMessage() throws Exception {
+      SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
+      String divertAddress = "divertAddress";
+
+      server.deployDivert(new DivertConfiguration().setName("testDivert").setAddress(divertAddress).setForwardingAddress(addressA.toString()));
+
       server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sessionFactory = createSessionFactory(locator);
+      ClientSession session = addClientSession(sessionFactory.createSession(true, true));
+      ClientProducer producer = addClientProducer(session.createProducer(divertAddress));
+      producer.send(session.createMessage(true));
+      producer.close();
+
+      Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000,
100);
+
+      triggerDlaDelivery();
+
+      Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000,
100);
+
+      ClientConsumer consumer = session.createConsumer(dlqName);
+      session.start();
+      ClientMessage message = consumer.receive(1000);
+      assertNotNull(message);
+      message.acknowledge();
+   }
+
+   @Test
+   public void testMovedMessage() throws Exception {
+      SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
+      final SimpleString moveFromAddress = new SimpleString("moveFromAddress");
+      final SimpleString moveFromQueue = new SimpleString("moveFromQueue");
+      server.createQueue(new QueueConfiguration(moveFromQueue).setAddress(moveFromAddress).setRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sessionFactory = createSessionFactory(locator);
+      ClientSession session = addClientSession(sessionFactory.createSession(true, true));
+      ClientProducer producer = addClientProducer(session.createProducer(moveFromAddress));
+      producer.send(session.createMessage(true));
+      producer.close();
+
+      server.locateQueue(moveFromQueue).moveReferences(null, addressA, null);
+
+      Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000,
100);
+
+      triggerDlaDelivery();
+
+      Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000,
100);
+
+      ClientConsumer consumer = session.createConsumer(dlqName);
+      session.start();
+      ClientMessage message = consumer.receive(1000);
+      assertNotNull(message);
+      message.acknowledge();
+   }
+
+   private void triggerDlaDelivery() throws Exception {
+      try {
+         server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
+      } catch (Exception e) {
+         // ignore
+      }
       ServerLocator locator = createInVMNonHALocator();
       ClientSessionFactory sessionFactory = createSessionFactory(locator);
       ClientSession session = addClientSession(sessionFactory.createSession(true, false));


Mime
View raw message