This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 1a5b1fb ARTEMIS-2788 clear openwire producer state on produce close event
new 4eb3fe1 Merge pull request #3160 from gtully/ARTEMIS-2788
1a5b1fb is described below
commit 1a5b1fbe8efe69ff19b538490911209ceaf92823
Author: gtully <gary.tully@gmail.com>
AuthorDate: Wed Jun 3 13:33:18 2020 +0100
ARTEMIS-2788 clear openwire producer state on produce close event
---
.../core/protocol/openwire/OpenWireConnection.java | 7 +++++
.../openwire/SessionHandlingOpenWireTest.java | 35 +++++++++++++++++++++-
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 2fa60de..970271c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1204,6 +1204,13 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
@Override
public Response processRemoveProducer(ProducerId id) throws Exception {
+ ConnectionState cs = getState();
+ if (cs != null) {
+ SessionState ss = cs.getSessionState(id.getParentId());
+ if (ss != null) {
+ ss.removeProducer(id);
+ }
+ }
return null;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
index c43d5f7..fec0fcc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
@@ -20,12 +20,17 @@ import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.SessionState;
import org.junit.AfterClass;
-import static org.junit.Assert.assertNotNull;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -81,4 +86,32 @@ public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up
resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}
+
+
+ @Test
+ public void testProducerState() throws Exception {
+ try (Connection conn = factory.createConnection()) {
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination dest = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ for (int i = 0; i < 10; i++) {
+ MessageProducer messageProducer = session.createProducer(dest);
+ messageProducer.close();
+ }
+
+ // verify no trace of producer on the broker
+ for (RemotingConnection remotingConnection : server.getRemotingService().getConnections())
{
+ if (remotingConnection instanceof OpenWireConnection) {
+ OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection;
+ ConnectionState connectionState = openWireConnection.getState();
+ if (connectionState != null) {
+ for (SessionState sessionState : connectionState.getSessionStates()) {
+ assertTrue("no producer states leaked", Wait.waitFor(() -> sessionState.getProducerIds().isEmpty()));
+ }
+ }
+ }
+ }
+ }
+ }
}
|