Repository: activemq
Updated Branches:
refs/heads/master 7413ee00e -> bd8661796
AMQ-6697 Adds a test to show that the described case works
Correctly ACK inside a TX and then Abort and then ACK again outside a TX
to show that the broker will then mark the message as consumed.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd866179
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd866179
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd866179
Branch: refs/heads/master
Commit: bd8661796b190ef605458cdd7a0d90d9af4f51a0
Parents: 7413ee0
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Jun 2 11:50:14 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Jun 2 11:50:14 2017 -0400
----------------------------------------------------------------------
.../activemq/transport/stomp/Stomp11Test.java | 59 ++++++++++++++++++++
1 file changed, 59 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd866179/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index a707cf2..f61c899 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -35,6 +35,7 @@ import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
@@ -1128,4 +1129,62 @@ public class Stomp11Test extends StompTestSupport {
assertEquals(view.getDurableTopicSubscribers().length, 2);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
}
+
+ @Test(timeout = 60000)
+ public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+ producer.close();
+
+ String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
+ "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("CONNECTED"));
+
+ QueueViewMBean queueView = getProxyToQueue(getQueueName());
+ assertEquals(1, queueView.getQueueSize());
+
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ // ack it in the TX then abort
+ frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ // rollback first message
+ frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertEquals(1, queueView.getQueueSize());
+
+ // ack it outside the TX and it should be really ack'd
+ frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getQueueSize() == 0;
+ }
+ }));
+
+ String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(unsub);
+
+ String receipt = stompConnection.receiveFrame();
+ assertTrue(receipt.contains("RECEIPT"));
+ }
}
|