activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6089 - support TMNOFLAGS as a scan end to allow looping calls to recover to terminate
Date Wed, 06 Jan 2016 12:54:46 GMT
Repository: activemq
Updated Branches:
  refs/heads/master e3df09b9d -> 16bc0f0d7


https://issues.apache.org/jira/browse/AMQ-6089 - support TMNOFLAGS as a scan end to allow
looping calls to recover to terminate


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/16bc0f0d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/16bc0f0d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/16bc0f0d

Branch: refs/heads/master
Commit: 16bc0f0d750530643333695ba20f6a736704271c
Parents: e3df09b
Author: gtully <gary.tully@gmail.com>
Authored: Wed Jan 6 12:54:20 2016 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Jan 6 12:54:20 2016 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/TransactionContext.java | 40 +++++++++++---------
 .../activemq/broker/XARecoveryBrokerTest.java   | 16 ++++++++
 2 files changed, 39 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/16bc0f0d/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 6bd7402..77826b1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -672,26 +672,32 @@ public class TransactionContext implements XAResource {
     @Override
     public Xid[] recover(int flag) throws XAException {
         LOG.debug("recover({})", flag);
+        XATransactionId[] answer;
 
-        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
-        try {
-            this.connection.checkClosedOrFailed();
-            this.connection.ensureConnectionInfoSent();
-
-            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
-            DataStructure[] data = receipt.getData();
-            XATransactionId[] answer;
-            if (data instanceof XATransactionId[]) {
-                answer = (XATransactionId[])data;
-            } else {
-                answer = new XATransactionId[data.length];
-                System.arraycopy(data, 0, answer, 0, data.length);
+        if (XAResource.TMNOFLAGS == flag) {
+            // signal next in cursor scan, which for us is always the end b/c we don't maintain
any cursor state
+            // allows looping scan to complete
+            answer = new XATransactionId[0];
+        } else {
+            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
+            try {
+                this.connection.checkClosedOrFailed();
+                this.connection.ensureConnectionInfoSent();
+
+                DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
+                DataStructure[] data = receipt.getData();
+                if (data instanceof XATransactionId[]) {
+                    answer = (XATransactionId[]) data;
+                } else {
+                    answer = new XATransactionId[data.length];
+                    System.arraycopy(data, 0, answer, 0, data.length);
+                }
+            } catch (JMSException e) {
+                throw toXAException(e);
             }
-            LOG.debug("recover({})={}", flag, answer);
-            return answer;
-        } catch (JMSException e) {
-            throw toXAException(e);
         }
+        LOG.debug("recover({})={}", flag, answer);
+        return answer;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/16bc0f0d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 2c41673..9660ef0 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.broker;
 
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -24,7 +26,11 @@ import javax.jms.JMSException;
 import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.TransactionContext;
 import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
@@ -102,6 +108,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         dar = (DataArrayResponse)response;
         assertEquals(4, dar.getData().length);
 
+        // verify XAResource scan loop
+        XAResource transactionContextXAResource = new TransactionContext(ActiveMQConnection.makeConnection(broker.getVmConnectorURI().toString()));
+        LinkedList<Xid> tracked = new LinkedList<Xid>();
+        Xid[] recoveryXids = transactionContextXAResource.recover(XAResource.TMSTARTRSCAN);
+        while (recoveryXids.length > 0) {
+            tracked.addAll(Arrays.asList(recoveryXids));
+            recoveryXids = transactionContextXAResource.recover(XAResource.TMNOFLAGS);
+        }
+        assertEquals("got 4 via scan loop", 4, tracked.size());
+
         // validate destination depth via jmx
         DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
         assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());


Mime
View raw message