activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [activemq] branch master updated: AMQ-7298 - rework redelivery message tracking to ensure no duplicate suppression (and dlq) in error for local transaction batches that failover
Date Wed, 04 Sep 2019 11:24:58 GMT
This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 0254877  AMQ-7298 - rework redelivery message tracking to ensure no duplicate suppression
(and dlq) in error for local transaction batches that failover
0254877 is described below

commit 02548777c2954f5ccad918df8b1280fd601f8fd6
Author: gtully <gary.tully@gmail.com>
AuthorDate: Wed Sep 4 12:24:42 2019 +0100

    AMQ-7298 - rework redelivery message tracking to ensure no duplicate suppression (and
dlq) in error for local transaction batches that failover
---
 .../apache/activemq/ActiveMQMessageConsumer.java   | 155 +++++----
 .../java/org/apache/activemq/ActiveMQSession.java  |   3 -
 .../java/org/apache/activemq/bugs/AMQ2149Test.java |  59 +++-
 .../FailoverDurableSubTransactionTest.java         | 368 +++++++++++++++++++++
 .../usecases/TopicProducerFlowControlTest.java     |   7 +-
 5 files changed, 515 insertions(+), 77 deletions(-)

diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 5c7015b..19dfcf8 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -111,6 +111,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
             this.transactionId = transactionId;
         }
     }
+    class PreviouslyDelivered {
+        org.apache.activemq.command.Message message;
+        boolean redelivered;
+
+        PreviouslyDelivered(MessageDispatch messageDispatch) {
+            message = messageDispatch.getMessage();
+        }
+
+        PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered) {
+            message = messageDispatch.getMessage();
+            this.redelivered = redelivered;
+        }
+    }
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
     protected final ActiveMQSession session;
@@ -124,7 +137,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
     // Always walk list in reverse order.
     protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
     // track duplicate deliveries in a transaction such that the tx integrity can be validated
-    private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
+    private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
     private int deliveredCounter;
     private int additionalWindowSize;
     private long redeliveryDelay;
@@ -144,7 +157,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
     private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
     private ExecutorService executorService;
     private MessageTransformer transformer;
-    private boolean clearDeliveredList;
+    private volatile boolean clearDeliveredList;
     AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
 
     private MessageAck pendingAck;
@@ -766,14 +779,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                     }
                     // allow dispatch on this connection to resume
                     session.connection.transportInterruptionProcessingComplete();
-                    inProgressClearRequiredFlag.decrementAndGet();
+                    inProgressClearRequiredFlag.set(0);
 
                     // Wake up any blockers and allow them to recheck state.
                     unconsumedMessages.getMutex().notifyAll();
                 }
             }
+            clearDeliveredList();
         }
-        clearDeliveredList();
     }
 
     void deliverAcks() {
@@ -869,6 +882,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                 }
             }
         }
+        if (previouslyDeliveredMessages != null) {
+            for (PreviouslyDelivered previouslyDelivered : previouslyDeliveredMessages.values())
{
+                session.connection.rollbackDuplicate(this, previouslyDelivered.message);
+            }
+        }
+        clearPreviouslyDelivered();
     }
 
     /**
@@ -1141,8 +1160,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                 numberNotReplayed = 0;
                 synchronized(deliveredMessages) {
                     if (previouslyDeliveredMessages != null) {
-                        for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
-                            if (!entry.getValue()) {
+                        for (PreviouslyDelivered entry: previouslyDeliveredMessages.values())
{
+                            if (!entry.redelivered) {
                                 numberNotReplayed++;
                             }
                         }
@@ -1169,11 +1188,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
             // if any previously delivered messages was not re-delivered, transaction is
invalid and must rollback
             // as messages have been dispatched else where.
             int numberNotReplayed = 0;
-            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
-                if (!entry.getValue()) {
+            for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
+                if (!entry.redelivered) {
                     numberNotReplayed++;
                     LOG.debug("previously delivered message has not been replayed in transaction:
{}, messageId: {}",
-                              previouslyDeliveredMessages.transactionId, entry.getKey());
+                              previouslyDeliveredMessages.transactionId, entry.message.getMessageId());
                 }
             }
             if (numberNotReplayed > 0) {
@@ -1244,8 +1263,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator();
iter.hasNext();) {
                     MessageDispatch md = iter.next();
                     md.getMessage().onMessageRolledBack();
-                    // ensure we don't filter this as a duplicate
-                    session.connection.rollbackDuplicate(this, md.getMessage());
                 }
 
                 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
@@ -1275,17 +1292,18 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                         session.sendAck(ack,true);
                     }
 
-                    // stop the delivery of messages.
-                    if (nonBlockingRedelivery) {
-                        if (!unconsumedMessages.isClosed()) {
+                    final LinkedList<MessageDispatch> pendingSessionRedelivery =
+                            new LinkedList<MessageDispatch>(deliveredMessages);
+
+                    captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(false);
 
-                            final LinkedList<MessageDispatch> pendingRedeliveries =
-                                new LinkedList<MessageDispatch>(deliveredMessages);
+                    deliveredCounter -= deliveredMessages.size();
+                    deliveredMessages.clear();
 
-                            Collections.reverse(pendingRedeliveries);
+                    if (!unconsumedMessages.isClosed()) {
 
-                            deliveredCounter -= deliveredMessages.size();
-                            deliveredMessages.clear();
+                        if (nonBlockingRedelivery) {
+                            Collections.reverse(pendingSessionRedelivery);
 
                             // Start up the delivery again a little later.
                             session.getScheduler().executeAfterDelay(new Runnable() {
@@ -1293,7 +1311,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                                 public void run() {
                                     try {
                                         if (!unconsumedMessages.isClosed()) {
-                                            for(MessageDispatch dispatch : pendingRedeliveries)
{
+                                            for(MessageDispatch dispatch : pendingSessionRedelivery)
{
                                                 session.dispatch(dispatch);
                                             }
                                         }
@@ -1302,42 +1320,52 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                                     }
                                 }
                             }, redeliveryDelay);
-                        }
-
-                    } else {
-                        unconsumedMessages.stop();
 
-                        for (MessageDispatch md : deliveredMessages) {
-                            unconsumedMessages.enqueueFirst(md);
-                        }
+                        } else {
+                            // stop the delivery of messages.
+                            unconsumedMessages.stop();
 
-                        deliveredCounter -= deliveredMessages.size();
-                        deliveredMessages.clear();
+                            final ActiveMQMessageConsumer dispatcher = this;
 
-                        if (redeliveryDelay > 0 && !unconsumedMessages.isClosed())
{
-                            // Start up the delivery again a little later.
-                            session.getScheduler().executeAfterDelay(new Runnable() {
+                            Runnable redispatchWork = new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
-                                        if (started.get()) {
-                                            start();
+                                        if (!unconsumedMessages.isClosed()) {
+                                            synchronized (unconsumedMessages.getMutex())
{
+                                                for (MessageDispatch md : pendingSessionRedelivery)
{
+                                                    unconsumedMessages.enqueueFirst(md);
+                                                }
+
+                                                if (messageListener.get() != null) {
+                                                    session.redispatch(dispatcher, unconsumedMessages);
+                                                }
+                                            }
+                                            if (started.get()) {
+                                                start();
+                                            }
                                         }
                                     } catch (JMSException e) {
                                         session.connection.onAsyncException(e);
                                     }
                                 }
-                            }, redeliveryDelay);
-                        } else {
-                            start();
+                            };
+
+                            if (redeliveryDelay > 0 && !unconsumedMessages.isClosed())
{
+                                // Start up the delivery again a little later.
+                                session.getScheduler().executeAfterDelay(redispatchWork,
redeliveryDelay);
+                            } else {
+                                redispatchWork.run();
+                            }
+                        }
+                    } else {
+                        for (MessageDispatch md : pendingSessionRedelivery) {
+                            session.connection.rollbackDuplicate(this, md.getMessage());
                         }
                     }
                 }
             }
         }
-        if (messageListener.get() != null) {
-            session.redispatch(this, unconsumedMessages);
-        }
     }
 
     /*
@@ -1347,10 +1375,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
      */
     private void rollbackPreviouslyDeliveredAndNotRedelivered() {
         if (previouslyDeliveredMessages != null) {
-            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
-                if (!entry.getValue()) {
-                    LOG.trace("rollback non redelivered: {}" + entry.getKey());
-                    removeFromDeliveredMessages(entry.getKey());
+            for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
+                if (!entry.redelivered) {
+                    LOG.trace("rollback non redelivered: {}", entry.message.getMessageId());
+                    removeFromDeliveredMessages(entry.message.getMessageId());
                 }
             }
             clearPreviouslyDelivered();
@@ -1390,7 +1418,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
             clearDeliveredList();
             synchronized (unconsumedMessages.getMutex()) {
                 if (!unconsumedMessages.isClosed()) {
-                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage()))
{
+                    // deliverySequenceId non zero means previously queued dispatch
+                    if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this,
md.getMessage())) {
                         if (listener != null && unconsumedMessages.isRunning()) {
                             if (redeliveryExceeded(md)) {
                                 posionAck(md, "listener dispatch[" + md.getRedeliveryCounter()
+ "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
@@ -1416,11 +1445,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                                 }
                             }
                         } else {
-                            if (!unconsumedMessages.isRunning()) {
-                                // delayed redelivery, ensure it can be re delivered
-                                session.connection.rollbackDuplicate(this, md.getMessage());
-                            }
-
+                            md.setDeliverySequenceId(-1); // skip duplicate check on subsequent
queued delivery
                             if (md.getMessage() == null) {
                                 // End of browse or pull request timeout.
                                 unconsumedMessages.enqueue(md);
@@ -1476,9 +1501,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
         if (session.isTransacted()) {
             synchronized (deliveredMessages) {
                 if (previouslyDeliveredMessages != null) {
-                    if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId()))
{
+                    PreviouslyDelivered entry;
+                    if ((entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId()))
!= null) {
                         if (markReceipt) {
-                            previouslyDeliveredMessages.put(md.getMessage().getMessageId(),
true);
+                            entry.redelivered = true;
                         }
                         return true;
                     }
@@ -1506,15 +1532,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                 if (clearDeliveredList) {
                     if (!deliveredMessages.isEmpty()) {
                         if (session.isTransacted()) {
-
-                            if (previouslyDeliveredMessages == null) {
-                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,
Boolean>(session.getTransactionContext().getTransactionId());
-                            }
-                            for (MessageDispatch delivered : deliveredMessages) {
-                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(),
false);
-                            }
-                            LOG.debug("{} tracking existing transacted {} delivered list
({}) on transport interrupt",
-                                      getConsumerId(), previouslyDeliveredMessages.transactionId,
deliveredMessages.size());
+                            captureDeliveredMessagesForDuplicateSuppression();
                         } else {
                             if (session.isClientAcknowledge()) {
                                 LOG.debug("{} rolling back delivered list ({}) on transport
interrupt", getConsumerId(), deliveredMessages.size());
@@ -1536,6 +1554,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
         }
     }
 
+    // called with deliveredMessages locked
+    private void captureDeliveredMessagesForDuplicateSuppression() {
+        captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery (true);
+    }
+
+    private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(boolean
requireRedelivery) {
+        if (previouslyDeliveredMessages == null) {
+            previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>(session.getTransactionContext().getTransactionId());
+        }
+        for (MessageDispatch delivered : deliveredMessages) {
+            previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), new PreviouslyDelivered(delivered,
!requireRedelivery));
+        }
+        LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(),
previouslyDeliveredMessages.transactionId, deliveredMessages.size());
+    }
+
     public int getMessageSize() {
         return unconsumedMessages.size();
     }
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 1b4abc1..9b18d89 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -2097,9 +2097,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
     public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages)
throws JMSException {
 
         List<MessageDispatch> c = unconsumedMessages.removeAll();
-        for (MessageDispatch md : c) {
-            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
-        }
         Collections.reverse(c);
 
         for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
index 804cc68..19b1ab0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
 import javax.jms.*;
 
 import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -49,6 +50,8 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX;
+import static org.apache.activemq.command.ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY;
 import static org.junit.Assert.*;
 
 interface Configurer {
@@ -91,6 +94,7 @@ public class AMQ2149Test {
     
     public void createBroker(Configurer configurer) throws Exception {
         broker = new BrokerService();
+        broker.setAdvisorySupport(false);
         configurePersistenceAdapter(broker);
         
         broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
@@ -164,8 +168,9 @@ public class AMQ2149Test {
         public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException
{
             this.dest = dest;
             this.transactional = transactional;
-            connection = new ActiveMQConnectionFactory(brokerURL)
-                    .createConnection();
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
+            connectionFactory.setWatchTopicAdvisories(false);
+            connection = connectionFactory.createConnection();
             connection.setClientID(dest.toString());
             session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED
: Session.AUTO_ACKNOWLEDGE);
             if (ActiveMQDestination.transform(dest).isTopic()) {
@@ -224,7 +229,7 @@ public class AMQ2149Test {
                 lastId = message.getJMSMessageID();
             } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery)
{
                 ++nextExpectedSeqNum;
-                LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
+                LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery, expectedSometimesOnFailoverRecovery);
                 if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion
in doubt")) {
                     // in doubt - either commit command or reply missing
                     // don't know if we will get a replay
@@ -235,7 +240,17 @@ public class AMQ2149Test {
                     // batch will be replayed
                     nextExpectedSeqNum -= TRANSACITON_BATCH;
                 }
-
+            } catch (JMSException expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException)
{
+                ++nextExpectedSeqNum;
+                LOG.info("got rollback: " + expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException,
expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException);
+                if (expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException.getMessage().contains("xaErrorCode:100"))
{
+                    resumeOnNextOrPreviousIsOk = false;
+                    // batch will be replayed
+                    nextExpectedSeqNum -= TRANSACITON_BATCH;
+                } else {
+                    LOG.error(dest + " onMessage error:" + expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException);
+                    exceptions.add(expectedSometimesOnFailoverRecoveryWithNestedTransactionRolledBackException);
+                }
             } catch (Throwable e) {
                 LOG.error(dest + " onMessage error:" + e);
                 exceptions.add(e);
@@ -258,8 +273,9 @@ public class AMQ2149Test {
 
         public Sender(javax.jms.Destination dest) throws JMSException {
             this.dest = dest;
-            connection = new ActiveMQConnectionFactory(brokerURL)
-                    .createConnection();
+            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
+            activeMQConnectionFactory.setWatchTopicAdvisories(false);
+            connection = activeMQConnectionFactory.createConnection();
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             messageProducer = session.createProducer(dest);
             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -360,7 +376,7 @@ public class AMQ2149Test {
         });
         
         verifyOrderedMessageReceipt();
-        verifyStats(false);
+        verifyStats(false, false);
     }
 
     @Test(timeout = 10 * 60 * 1000)
@@ -383,7 +399,7 @@ public class AMQ2149Test {
             timer.cancel();
         }
         
-        verifyStats(true);
+        verifyStats(true, false);
     }
 
     @Test(timeout = 10 * 60 * 1000)
@@ -403,7 +419,7 @@ public class AMQ2149Test {
             timer.cancel();
         }
         
-        verifyStats(true);
+        verifyStats(true, false);
     }
 
     @Test(timeout = 10 * 60 * 1000)
@@ -436,10 +452,10 @@ public class AMQ2149Test {
             timer.cancel();
         }
         
-        verifyStats(true);
+        verifyStats(true, true);
     }
 
-    private void verifyStats(boolean brokerRestarts) throws Exception {
+    private void verifyStats(boolean brokerRestarts, boolean transactional) throws Exception
{
         RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
         
         for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values())
{
@@ -455,6 +471,27 @@ public class AMQ2149Test {
                         stats.getEnqueues().getCount(), stats.getDequeues().getCount());
  
             }
         }
+        Destination activeMQDlq = regionBroker.getQueueRegion().getDestinationMap().get(new
ActiveMQQueue("ActiveMQ.DLQ"));
+        if (activeMQDlq != null) {
+
+            // excuse duplicates from the store
+            int countToExcuse = 0;
+            org.apache.activemq.command.Message[] messages = activeMQDlq.browse();
+            for (org.apache.activemq.command.Message candidate: messages) {
+                final Object cause = candidate.getProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause!= null &&
+                        ((((String)cause).contains(DUPLICATE_FROM_STORE_MSG_PREFIX)) ||
+                        !transactional && ((String)cause).contains("Suppressing duplicate
delivery on connection"))) {
+                    // expected some duplicate sends for durable subs
+                    countToExcuse++;
+                } else {
+                    LOG.error("Unexpected dlq: " + cause + ", " + candidate);
+                }
+            }
+
+            assertEquals("no unexpcted dlq messages", countToExcuse ,
+                    activeMQDlq.getDestinationStatistics().getMessages().getCount());
+        }
     }
 
     private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer)
{
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
new file mode 100644
index 0000000..cc5e6da
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.*;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.*;
+import javax.jms.Message;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class FailoverDurableSubTransactionTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverDurableSubTransactionTest.class);
+    private static final String TOPIC_NAME = "Failover.WithTx";
+    private static final String TRANSPORT_URI = "tcp://localhost:0";
+    private String url;
+    BrokerService broker;
+
+    public enum FailType {
+        ON_DISPATCH,
+        ON_ACK,
+        ON_COMMIT,
+        ON_DISPACH_WITH_REPLAY_DELAY
+    }
+
+    @Parameterized.Parameter(0)
+    public FailType failType;
+
+    @Parameterized.Parameters(name ="failType=#{0}")
+    public static Iterable<Object[]> parameters() {
+        return Arrays.asList(new Object[][]{
+                {FailType.ON_DISPATCH},
+                {FailType.ON_DISPACH_WITH_REPLAY_DELAY},
+                {FailType.ON_ACK},
+                {FailType.ON_COMMIT}
+        });
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        stopBroker();
+    }
+
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+        broker = createBroker(deleteAllMessagesOnStartup);
+        broker.start();
+    }
+
+    public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws
Exception {
+        broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
+        broker.start();
+    }
+
+    public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception
{
+        return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
+    }
+
+    public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress)
throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.addConnector(bindAddress);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        // faster redispatch
+        broker.setKeepDurableSubsActive(true);
+
+        url = broker.getTransportConnectors().get(0).getConnectUri().toString();
+
+        return broker;
+    }
+
+    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
+        factory.setWatchTopicAdvisories(false);
+        factory.getRedeliveryPolicy().setMaximumRedeliveries(-1);
+
+        if (!FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals(failType)) {
+            factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l);
+            factory.getRedeliveryPolicy().setRedeliveryDelay(0l);
+        }
+    }
+
+
+    @org.junit.Test
+    public void testFailoverCommit() throws Exception {
+
+        final AtomicInteger dispatchedCount = new AtomicInteger(0);
+        final int errorAt = FailType.ON_COMMIT.equals(failType) ? 1 : 9;
+        final int messageCount = 10;
+        broker = createBroker(true);
+
+        broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context, TransactionId
xid, boolean onePhase) throws Exception {
+                        if (FailType.ON_COMMIT.equals(failType) && dispatchedCount.incrementAndGet()
== errorAt) {
+                            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections())
{
+                                LOG.error("Whacking connection on commit: " + transportConnection);
+                                transportConnection.serviceException(new IOException("ERROR
NOW"));
+                            }
+                        } else {
+                            super.commitTransaction(context, xid, onePhase);
+                        }
+                    }
+
+                    @Override
+                    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck
ack) throws Exception {
+                        if (FailType.ON_ACK.equals(failType) && ack.getAckType()
== MessageAck.DELIVERED_ACK_TYPE && dispatchedCount.incrementAndGet() == errorAt)
{
+                            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections())
{
+                                LOG.error("Whacking connection on ack: " + transportConnection);
+                                transportConnection.serviceException(new IOException("ERROR
NOW"));
+                            }
+                        }
+                        super.acknowledge(consumerExchange, ack);
+                    }
+
+                    @Override
+                    public void postProcessDispatch(MessageDispatch messageDispatch) {
+                        super.postProcessDispatch(messageDispatch);
+                        if ((FailType.ON_DISPATCH.equals(failType) || FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals(failType))
&& dispatchedCount.incrementAndGet() == errorAt) {
+                            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections())
{
+                                LOG.error("Whacking connection on dispatch: " + transportConnection);
+                                transportConnection.serviceException(new IOException("ERROR
NOW"));
+                            }
+                        }
+                    }
+                }
+        });
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        cf.setAlwaysSyncSend(true);
+        cf.setAlwaysSessionAsync(false);
+        cf.getPrefetchPolicy().setDurableTopicPrefetch(FailType.ON_ACK.equals(failType) ?
2 : 100);
+        configureConnectionFactory(cf);
+        Connection connection = cf.createConnection();
+        connection.setClientID("CID");
+        connection.start();
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Topic destination = session.createTopic(TOPIC_NAME);
+
+        MessageConsumer consumer = session.createDurableSubscriber(destination, "DS");
+        consumer.close();
+
+        produceMessage(destination, messageCount);
+        LOG.info("Production done! " + broker.getDestination(ActiveMQDestination.transform(destination)));
+
+
+        consumer = session.createDurableSubscriber(destination, "DS");
+
+        AtomicBoolean success = new AtomicBoolean(false);
+
+        HashSet<Integer> dupCheck = new HashSet<Integer>();
+        while (!success.get()) {
+            dupCheck.clear();
+            int i = 0;
+            for (i = 0; i < messageCount; i++) {
+                Message msg = consumer.receive(5000);
+                if (msg == null) {
+                    LOG.info("Failed to receive on: " + i);
+                    break;
+                }
+                LOG.info("Received: @" + i + ":" + msg.getJMSMessageID() + ", ID:" + msg.getIntProperty("ID"));
+                assertTrue("single instance of: " +  i, dupCheck.add( msg.getIntProperty("ID")));
+            }
+
+            try {
+                if (i == messageCount) {
+                    session.commit();
+                    success.set(true);
+                } else {
+                    session.rollback();
+                }
+            } catch (TransactionRolledBackException expected) {
+                LOG.info("Got expected", expected);
+                session.rollback();
+            }
+        }
+
+        consumer.close();
+        connection.close();
+
+        org.apache.activemq.broker.region.Destination dlq = broker.getDestination(ActiveMQDestination.transform(new
ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME)));
+        LOG.info("DLQ: " + dlq);
+        assertEquals("DLQ empty ", 0, dlq.getDestinationStatistics().getMessages().getCount());
+
+    }
+
+    @org.junit.Test
+    public void testFailoverCommitListener() throws Exception {
+
+        final AtomicInteger dispatchedCount = new AtomicInteger(0);
+        final int errorAt = FailType.ON_ACK.equals(failType) ? 1 : 1;
+        final int messageCount = 10;
+        broker = createBroker(true);
+
+        broker.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context, TransactionId
xid, boolean onePhase) throws Exception {
+                        LOG.info("commit request: " + xid);
+                        if (FailType.ON_COMMIT.equals(failType) && dispatchedCount.incrementAndGet()
== errorAt) {
+                            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections())
{
+                                LOG.error("Whacking connection on commit: " + transportConnection);
+                                transportConnection.serviceException(new IOException("ERROR
NOW"));
+                            }
+                        } else {
+                            super.commitTransaction(context, xid, onePhase);
+                        }
+                    }
+
+
+                    @Override
+                    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck
ack) throws Exception {
+                        LOG.info("ack request: " + ack);
+                        if (FailType.ON_ACK.equals(failType) /*&& ack.getAckType()
== MessageAck.DELIVERED_ACK_TYPE*/ && dispatchedCount.incrementAndGet() == errorAt)
{
+                            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections())
{
+                                LOG.error("Whacking connection on ack: " + transportConnection);
+                                transportConnection.serviceException(new IOException("ERROR
NOW"));
+                            }
+                        } else {
+                            super.acknowledge(consumerExchange, ack);
+                        }
+                    }
+                }
+
+        });
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        cf.setAlwaysSyncSend(true);
+        cf.setAlwaysSessionAsync(true);
+        //cf.getPrefetchPolicy().setDurableTopicPrefetch(FailType.ON_ACK.equals(failType)
? 2 : 100);
+        cf.setWatchTopicAdvisories(false);
+        Connection connection = cf.createConnection();
+        connection.setClientID("CID");
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Topic destination = session.createTopic(TOPIC_NAME);
+
+        MessageConsumer consumer = session.createDurableSubscriber(destination, "DS");
+        consumer.close();
+        connection.close();
+
+        produceMessage(destination, messageCount*2);
+        LOG.info("Production done! " + broker.getDestination(ActiveMQDestination.transform(destination)));
+
+
+        connection = cf.createConnection();
+        connection.setClientID("CID");
+        connection.start();
+        final Session receiveSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = receiveSession.createDurableSubscriber(destination, "DS");
+
+        AtomicBoolean success = new AtomicBoolean(false);
+
+        HashSet<Integer> dupCheck = new HashSet<Integer>();
+        final AtomicInteger receivedCount = new AtomicInteger();
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message msg) {
+                    try {
+                        int i = receivedCount.getAndIncrement();
+                        LOG.info("Received: @" + i + ":" + msg.getJMSMessageID() + ", ID:"
+ msg.getIntProperty("ID"));
+                        assertTrue("single instance of: " +  i, dupCheck.add( msg.getIntProperty("ID")));
+
+                        if (receivedCount.get() == messageCount) {
+                            receiveSession.commit();
+                            success.set(true);
+                        }
+                    } catch (TransactionRolledBackException expected) {
+                        LOG.info("Got expected", expected);
+                        try {
+                            receiveSession.rollback();
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                        dupCheck.clear();
+                        receivedCount.set(0);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+            }
+        });
+        connection.start();
+
+        try {
+
+            assertTrue("committed ok", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return success.get();
+                }
+            }));
+        } finally {
+            consumer.close();
+            connection.close();
+        }
+
+        org.apache.activemq.broker.region.Destination dlq = broker.getDestination(ActiveMQDestination.transform(new
ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME)));
+        LOG.info("DLQ: " + dlq);
+        assertEquals("DLQ empty ", 0, dlq.getDestinationStatistics().getMessages().getCount());
+
+    }
+
+    private void produceMessage(Topic destination, int count)
+            throws JMSException {
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(url);
+        configureConnectionFactory(cf);
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+        TextMessage message = producerSession.createTextMessage("Test message");
+        for (int i=0; i<count; i++) {
+            message.setIntProperty("ID", i);
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
index 9beb4d6..b26f36e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -313,8 +314,10 @@ public class TopicProducerFlowControlTest extends TestCase implements
MessageLis
             c.close();
 
             // verify no pending sends completed in rolledback tx
-            assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
-
+            // temp dest should not exist
+            if (!ActiveMQDestination.transform(destination).isTemporary()) {
+                assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
+            }
         } finally {
             log4jLogger.removeAppender(appender);
         }


Mime
View raw message