activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2364 collision avoidance for redelivery
Date Mon, 26 Aug 2019 03:41:19 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 449f032  ARTEMIS-2364 collision avoidance for redelivery
     new a848572  This closes #2691
449f032 is described below

commit 449f0323ecf0bb540e1303d31c77ff499a6ef5b1
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Fri May 31 15:20:29 2019 -0500

    ARTEMIS-2364 collision avoidance for redelivery
    
    This is a feature from 5.x implemented via
    https://issues.apache.org/jira/browse/AMQ-747.
---
 .../artemis/core/config/impl/Validators.java       | 12 +++
 .../deployers/impl/FileConfigurationParser.java    |  7 ++
 .../artemis/core/server/ActiveMQMessageBundle.java |  3 +
 .../artemis/core/server/impl/QueueImpl.java        |  8 ++
 .../core/settings/impl/AddressSettings.java        | 32 +++++++
 .../resources/schema/artemis-configuration.xsd     |  8 ++
 .../core/config/impl/FileConfigurationTest.java    |  2 +
 .../resources/ConfigurationTest-full-config.xml    |  1 +
 ...rationTest-xinclude-config-address-settings.xml |  1 +
 .../src/test/resources/artemis-configuration.xsd   |  8 ++
 docs/user-manual/en/address-model.md               |  6 ++
 docs/user-manual/en/configuration-index.md         |  7 +-
 docs/user-manual/en/undelivered-messages.md        | 58 ++++++++++---
 .../integration/client/RedeliveryConsumerTest.java | 98 ++++++++++++++++++++++
 14 files changed, 238 insertions(+), 13 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index a5d41ee..52ec5db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -98,6 +98,18 @@ public final class Validators {
       }
    };
 
+   public static final Validator LE_ONE = new Validator() {
+      @Override
+      public void validate(final String name, final Object value) {
+         Number val = (Number) value;
+         if (val.doubleValue() <= 1) {
+            // OK
+         } else {
+            throw ActiveMQMessageBundle.BUNDLE.lessThanOrEqualToOne(name, val);
+         }
+      }
+   };
+
    public static final Validator MINUS_ONE_OR_GT_ZERO = new Validator() {
       @Override
       public void validate(final String name, final Object value) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 3b9062d..9b23c29 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -166,6 +166,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
{
 
    private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";
 
+   private static final String REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME = "redelivery-collision-avoidance-factor";
+
    private static final String MAX_REDELIVERY_DELAY_NODE_NAME = "max-redelivery-delay";
 
    private static final String MAX_DELIVERY_ATTEMPTS = "max-delivery-attempts";
@@ -1046,6 +1048,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
{
             addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
          } else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setRedeliveryMultiplier(XMLUtil.parseDouble(child));
+         } else if (REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME.equalsIgnoreCase(name))
{
+            double redeliveryCollisionAvoidanceFactor = XMLUtil.parseDouble(child);
+            Validators.GE_ZERO.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME,
redeliveryCollisionAvoidanceFactor);
+            Validators.LE_ONE.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
+            addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
          } else if (MAX_REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
          } else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index c8eca81..8f40777 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -479,4 +479,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 229227, value = "{0}  must be equals to -1 or greater than 0 and less than
or equal to Integer.MAX_VALUE (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
    IllegalArgumentException inRangeOfPositiveIntThanMinusOne(String name, Number val);
+
+   @Message(id = 229228, value = "{0} must be less than or equal to 1 (actual value: {1})",
format = Message.Format.MESSAGE_FORMAT)
+   IllegalArgumentException lessThanOrEqualToOne(String name, Number val);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7e8d82e..ea2ce33 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -28,10 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -3617,9 +3619,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
       long redeliveryDelay = addressSettings.getRedeliveryDelay();
       long maxRedeliveryDelay = addressSettings.getMaxRedeliveryDelay();
       double redeliveryMultiplier = addressSettings.getRedeliveryMultiplier();
+      double collisionAvoidanceFactor = addressSettings.getRedeliveryCollisionAvoidanceFactor();
 
       int tmpDeliveryCount = deliveryCount > 0 ? deliveryCount - 1 : 0;
       long delay = (long) (redeliveryDelay * (Math.pow(redeliveryMultiplier, tmpDeliveryCount)));
+      if (collisionAvoidanceFactor > 0) {
+         Random random = ThreadLocalRandom.current();
+         double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor)
* random.nextDouble();
+         delay += (delay * variance);
+      }
 
       if (delay > maxRedeliveryDelay) {
          delay = maxRedeliveryDelay;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 625d652..0129ec7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -54,6 +54,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
 
    public static final double DEFAULT_REDELIVER_MULTIPLIER = 1.0;
 
+   public static final double DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR = 0.0;
+
    public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
 
    @Deprecated
@@ -125,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
 
    private Double redeliveryMultiplier = null;
 
+   private Double redeliveryCollisionAvoidanceFactor = null;
+
    private Long maxRedeliveryDelay = null;
 
    private SimpleString deadLetterAddress = null;
@@ -223,6 +227,7 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       this.messageCounterHistoryDayLimit = other.messageCounterHistoryDayLimit;
       this.redeliveryDelay = other.redeliveryDelay;
       this.redeliveryMultiplier = other.redeliveryMultiplier;
+      this.redeliveryCollisionAvoidanceFactor = other.redeliveryCollisionAvoidanceFactor;
       this.maxRedeliveryDelay = other.maxRedeliveryDelay;
       this.deadLetterAddress = other.deadLetterAddress;
       this.expiryAddress = other.expiryAddress;
@@ -566,6 +571,15 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       return this;
    }
 
+   public double getRedeliveryCollisionAvoidanceFactor() {
+      return redeliveryCollisionAvoidanceFactor != null ? redeliveryCollisionAvoidanceFactor
: AddressSettings.DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR;
+   }
+
+   public AddressSettings setRedeliveryCollisionAvoidanceFactor(final double redeliveryCollisionAvoidanceFactor)
{
+      this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
+      return this;
+   }
+
    public long getMaxRedeliveryDelay() {
       // default is redelivery-delay * 10 as specified on the docs and at this JIRA:
       // https://issues.jboss.org/browse/HORNETQ-1263
@@ -776,6 +790,9 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       if (redeliveryMultiplier == null) {
          redeliveryMultiplier = merged.redeliveryMultiplier;
       }
+      if (redeliveryCollisionAvoidanceFactor == null) {
+         redeliveryCollisionAvoidanceFactor = merged.redeliveryCollisionAvoidanceFactor;
+      }
       if (maxRedeliveryDelay == null) {
          maxRedeliveryDelay = merged.maxRedeliveryDelay;
       }
@@ -1059,6 +1076,10 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       if (buffer.readableBytes() > 0) {
          defaultRingSize = BufferHelper.readNullableLong(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         redeliveryCollisionAvoidanceFactor = BufferHelper.readNullableDouble(buffer);
+      }
    }
 
    @Override
@@ -1073,6 +1094,7 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
          BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
          BufferHelper.sizeOfNullableLong(redeliveryDelay) +
          BufferHelper.sizeOfNullableDouble(redeliveryMultiplier) +
+         BufferHelper.sizeOfNullableDouble(redeliveryCollisionAvoidanceFactor) +
          BufferHelper.sizeOfNullableLong(maxRedeliveryDelay) +
          SimpleString.sizeofNullableString(deadLetterAddress) +
          SimpleString.sizeofNullableString(expiryAddress) +
@@ -1210,6 +1232,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
 
       BufferHelper.writeNullableLong(buffer, defaultRingSize);
 
+      BufferHelper.writeNullableDouble(buffer, redeliveryCollisionAvoidanceFactor);
+
    }
 
    /* (non-Javadoc)
@@ -1235,6 +1259,7 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       result = prime * result + ((pageMaxCache == null) ? 0 : pageMaxCache.hashCode());
       result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
       result = prime * result + ((redeliveryMultiplier == null) ? 0 : redeliveryMultiplier.hashCode());
+      result = prime * result + ((redeliveryCollisionAvoidanceFactor == null) ? 0 : redeliveryCollisionAvoidanceFactor.hashCode());
       result = prime * result + ((maxRedeliveryDelay == null) ? 0 : maxRedeliveryDelay.hashCode());
       result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
       result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
@@ -1363,6 +1388,11 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
             return false;
       } else if (!redeliveryMultiplier.equals(other.redeliveryMultiplier))
          return false;
+      if (redeliveryCollisionAvoidanceFactor == null) {
+         if (other.redeliveryCollisionAvoidanceFactor != null)
+            return false;
+      } else if (!redeliveryCollisionAvoidanceFactor.equals(other.redeliveryCollisionAvoidanceFactor))
+         return false;
       if (maxRedeliveryDelay == null) {
          if (other.maxRedeliveryDelay != null)
             return false;
@@ -1580,6 +1610,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
          redeliveryDelay +
          ", redeliveryMultiplier=" +
          redeliveryMultiplier +
+         ", redeliveryCollisionAvoidanceFactor=" +
+         redeliveryCollisionAvoidanceFactor +
          ", maxRedeliveryDelay=" +
          maxRedeliveryDelay +
          ", redistributionDelay=" +
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index a9e4be6..fea5cc6 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2991,6 +2991,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double"
maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     factor by which to modify the redelivery delay slightly to avoid collisions
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index a16d8c0..e0f6372 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -343,6 +343,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals("a1.1", conf.getAddressesSettings().get("a1").getDeadLetterAddress().toString());
       assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
       assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
+      assertEquals(0.5, conf.getAddressesSettings().get("a1").getRedeliveryCollisionAvoidanceFactor(),
0);
       assertEquals(856686592L, conf.getAddressesSettings().get("a1").getMaxSizeBytes());
       assertEquals(817381738L, conf.getAddressesSettings().get("a1").getPageSizeBytes());
       assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
@@ -365,6 +366,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
       assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
       assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
+      assertEquals(0.0, conf.getAddressesSettings().get("a2").getRedeliveryCollisionAvoidanceFactor(),
0);
       assertEquals(932489234928324L, conf.getAddressesSettings().get("a2").getMaxSizeBytes());
       assertEquals(712671626L, conf.getAddressesSettings().get("a2").getPageSizeBytes());
       assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize());
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 9b213cc..d0843cd 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -383,6 +383,7 @@
             <dead-letter-address>a1.1</dead-letter-address>
             <expiry-address>a1.2</expiry-address>
             <redelivery-delay>1</redelivery-delay>
+            <redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
             <max-size-bytes>817M</max-size-bytes>
             <page-size-bytes>817381738</page-size-bytes>
             <page-max-cache-size>10</page-max-cache-size>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 49b625a..92dee3f 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -19,6 +19,7 @@
       <dead-letter-address>a1.1</dead-letter-address>
       <expiry-address>a1.2</expiry-address>
       <redelivery-delay>1</redelivery-delay>
+      <redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
       <max-size-bytes>817M</max-size-bytes>
       <page-size-bytes>817381738</page-size-bytes>
       <page-max-cache-size>10</page-max-cache-size>
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 64f32cd..6b3f261 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -2957,6 +2957,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double"
maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     factor by which to modify the redelivery delay slightly to avoid collisions
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 7512522..0cbe809 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -573,6 +573,7 @@ that would be found in the `broker.xml` file.
       <expiry-delay>123</expiry-delay>
       <redelivery-delay>5000</redelivery-delay>
       <redelivery-delay-multiplier>1.0</redelivery-delay-multiplier>
+      <redelivery-collision-avoidance-factor>0.0</redelivery-collision-avoidance-factor>
       <max-redelivery-delay>10000</max-redelivery-delay>
       <max-delivery-attempts>3</max-delivery-attempts>
       <max-size-bytes>100000</max-size-bytes>
@@ -660,6 +661,11 @@ messages](undelivered-messages.md#configuring-delayed-redelivery).
 Default is `1.0`. Read more about [undelivered
 messages](undelivered-messages.md#configuring-delayed-redelivery).
 
+`redelivery-collision-avoidance-factor` defines an additional factor used to
+calculate an adjustment to the `redelivery-delay` (up or down). Default is
+`0.0`. Valid values are between 0.0 and 1.0. Read more about [undelivered
+messages](undelivered-messages.md#configuring-delayed-redelivery).
+
 `max-size-bytes`, `page-size-bytes`, & `page-max-cache-size` are used to
 configure paging on an address. This is explained
 [here](paging.md#configuration).
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 4289e12..0deb104 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -204,10 +204,11 @@ Name | Description | Default
 [match](address-model.md) | The filter to apply to the setting | n/a
 [dead-letter-address](undelivered-messages.md) | Dead letter address | n/a
 [expiry-address](message-expiry.md) | Expired messages address | n/a
-[expiry-delay](address-model.md) | Expiration time override; -1 don't override | -1
+[expiry-delay](message-expiry.md) | Expiration time override; -1 don't override | -1
 [redelivery-delay](undelivered-messages.md) | Time to wait before redelivering a message
| 0
-[redelivery-delay-multiplier](address-model.md) | Multiplier to apply to the `redelivery-delay`
| 1.0
-[max-redelivery-delay](address-model.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
+[redelivery-delay-multiplier](undelivered-messages.md) | Multiplier to apply to the `redelivery-delay`
| 1.0
+[redelivery-collision-avoidance-factor](undelivered-messages.md) | an additional factor used
to calculate an adjustment to the `redelivery-delay` (up or down) | 0.0
+[max-redelivery-delay](undelivered-messages.md) | Max value for the `redelivery-delay` |
10 \* `redelivery-delay`
 [max-delivery-attempts](undelivered-messages.md)| Number of retries before dead letter address|
10
 [max-size-bytes](paging.md)| Max size a queue can be before invoking `address-full-policy`
| -1
 [max-size-bytes-reject-threshold]() | Used with `BLOCK`, the max size an address can reach
before messages are rejected; works in combination with `max-size-bytes` **for AMQP clients
only**. | -1
diff --git a/docs/user-manual/en/undelivered-messages.md b/docs/user-manual/en/undelivered-messages.md
index 5ef5565..a6ca2e2 100644
--- a/docs/user-manual/en/undelivered-messages.md
+++ b/docs/user-manual/en/undelivered-messages.md
@@ -42,6 +42,8 @@ Delayed redelivery is defined in the address-setting configuration:
    <redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
    <!-- default is 0 (no delay) -->
    <redelivery-delay>5000</redelivery-delay>
+   <!-- default is 0.0) -->
+   <redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
    <!-- default is redelivery-delay * 10 -->
    <max-redelivery-delay>50000</max-redelivery-delay>
 </address-setting>
@@ -59,24 +61,60 @@ message will be sent asynchronously back to the queue after the delay.
 You can specify a multiplier (the `redelivery-delay-multiplier`) that will
 take effect on top of the `redelivery-delay`.  Each time a message is redelivered
 the delay period will be equal to the previous delay * `redelivery-delay-multiplier`.
-A max-redelivery-delay can be set to prevent the delay from becoming too large.
-The max-redelivery-delay is defaulted to redelivery-delay \* 10.
+A `max-redelivery-delay` can be set to prevent the delay from becoming too large.
+The `max-redelivery-delay` is defaulted to `redelivery-delay` \* 10.
 
-Example:
+**Example:**
 
-    - redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000
+- redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000,
+  redelivery-collision-avoidance-factor=0.0
 
-    1. Delivery Attempt 1. (Unsuccessful)
-    2. Wait Delay Period: 5000
-    3. Delivery Attempt 2. (Unsuccessful)
-    4. Wait Delay Period: 10000                   // (5000  * 2) < max-delay-period. 
Use 10000
-    5. Delivery Attempt 3: (Unsuccessful)
-    6. Wait Delay Period: 15000                   // (10000 * 2) > max-delay-period: 
Use max-delay-delivery
+1. Delivery Attempt 1. (Unsuccessful)
+2. Wait Delay Period: 5000
+3. Delivery Attempt 2. (Unsuccessful)
+4. Wait Delay Period: 10000                   // (5000  * 2) < max-delay-period.  Use
10000
+5. Delivery Attempt 3: (Unsuccessful)
+6. Wait Delay Period: 15000                   // (10000 * 2) > max-delay-period:  Use
max-delay-delivery
 
 Address wildcards can be used to configure redelivery delay for a set of
 addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)), so you don't have
to specify redelivery delay
 individually for each address.
 
+The `redelivery-delay` can be also be modified by configuring the
+`redelivery-collision-avoidance-factor`. This factor will be made either
+positive or negative at random to control whether the ultimate value will
+increase or decrease the `redelivery-delay`. Then it's multiplied by a random
+number between 0.0 and 1.0. This result is then multiplied by the
+`redelivery-delay` and then added to the `redelivery-delay` to arrive at the
+final value.
+
+The algorithm may sound complicated but the bottom line is quite simple: the
+larger `redelivery-collision-avoidance-factor` you choose the larger the variance
+of the `redelivery-delay` will be. The `redelivery-collision-avoidance-factor`
+must be between 0.0 and 1.0.
+
+**Example:**
+
+- redelivery-delay=1000, redelivery-delay-multiplier=1, max-redelivery-delay=15000,
+  redelivery-collision-avoidance-factor=0.5, (bold values chosen using
+  `java.util.Random`)
+
+1. Delivery Attempt 1. (Unsuccessful)
+2. Wait Delay Period: 875                     // 1000 + (1000 * ((0.5 * __-1__) * __.25__)
+3. Delivery Attempt 2. (Unsuccessful)
+4. Wait Delay Period: 1375                    // 1000 + (1000 * ((0.5 * __1__) * __.75__)
+5. Delivery Attempt 3: (Unsuccessful)
+6. Wait Delay Period: 975                     // 1000 + (1000 * ((0.5 * __-1__) * __.05__)
+
+This feature can be particularly useful in environments where there are
+multiple consumers on the same queue all interacting transactionally
+with the same external system (e.g. a database). If there is overlapping
+data in messages which are consumed concurrently then one transaction can
+succeed while all the rest fail. If those failed messages are redelivered
+at the same time then this process where one consumer succeeds and the
+rest fail will continue. By randomly padding the redelivery-delay by a
+small, configurable amount these redelivery "collisions" can be avoided.
+
 ### Example
 
 See [the examples chapter](examples.md) for an example which shows how delayed redelivery
is configured
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
index cfe6fa9..69e516e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RedeliveryConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.client;
 
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -302,6 +303,103 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase {
 
    }
 
+   @Test
+   public void testRedeliveryCollisionAvoidance() throws Exception {
+      setUp(false);
+      int numberOfThreads = 10;
+      long redeliveryDelay = 1000;
+      server.getAddressSettingsRepository().getMatch(ADDRESS.toString()).setRedeliveryDelay(redeliveryDelay).setRedeliveryCollisionAvoidanceFactor(0.5);
+
+      ClientSession session = factory.createSession(false, false, false);
+      ClientProducer prod = session.createProducer(ADDRESS);
+      for (int i = 0; i < numberOfThreads; i++) {
+         prod.send(createTextMessage(session, "Hello" + i));
+      }
+      session.commit();
+      session.close();
+
+      final CountDownLatch aligned = new CountDownLatch(numberOfThreads);
+      final CountDownLatch startRollback = new CountDownLatch(1);
+
+      class ConsumerThread extends Thread {
+
+         ConsumerThread(int i) {
+            super("RedeliveryCollisionAvoidance::" + i);
+         }
+
+         long delay = 0;
+         int errors = 0;
+
+         @Override
+         public void run() {
+            try (ServerLocator locator = createInVMNonHALocator()) {
+               locator.setConsumerWindowSize(0);
+               ClientSessionFactory factory = locator.createSessionFactory();
+               ClientSession session = factory.createSession(false, false, false);
+               session.start();
+               ClientConsumer consumer = session.createConsumer(ADDRESS);
+               ClientMessage msg = consumer.receive(5000);
+               assertNotNull(msg);
+               msg.acknowledge();
+               aligned.countDown();
+               startRollback.await();
+               session.rollback();
+               long start = System.currentTimeMillis();
+               msg = consumer.receive(5000);
+               delay = System.currentTimeMillis() - start;
+               assertNotNull(msg);
+               msg.acknowledge();
+               session.commit();
+            } catch (Exception e) {
+               e.printStackTrace();
+               errors++;
+            }
+         }
+      }
+
+      ConsumerThread[] threads = new ConsumerThread[numberOfThreads];
+
+      for (int i = 0; i < numberOfThreads; i++) {
+         threads[i] = new ConsumerThread(i);
+         threads[i].start();
+      }
+
+      aligned.await();
+      startRollback.countDown();
+
+      try {
+         for (ConsumerThread t : threads) {
+            t.join(60000);
+            assertFalse(t.isAlive());
+            assertEquals("There are Errors on the test thread", 0, t.errors);
+         }
+      } finally {
+         for (ConsumerThread t : threads) {
+            if (t.isAlive()) {
+               t.interrupt();
+            }
+            t.join(1000);
+         }
+      }
+
+      long maxDelay = 0;
+      long minDelay = Long.MAX_VALUE;
+
+      for (ConsumerThread t : threads) {
+         if (t.delay < minDelay) {
+            minDelay = t.delay;
+         }
+         if (t.delay > maxDelay) {
+            maxDelay = t.delay;
+         }
+      }
+
+      // make sure the difference between the minimum redelivery delay and the maximum redelivery
delay is larger that the expected nominal variance
+      assertTrue((maxDelay - minDelay) > (redeliveryDelay * .05));
+
+      factory.close();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------


Mime
View raw message