activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2788 clear openwire producer state on produce close event
Date Wed, 03 Jun 2020 13:57:32 GMT
This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a5b1fb  ARTEMIS-2788 clear openwire producer state on produce close event
     new 4eb3fe1  Merge pull request #3160 from gtully/ARTEMIS-2788
1a5b1fb is described below

commit 1a5b1fbe8efe69ff19b538490911209ceaf92823
Author: gtully <gary.tully@gmail.com>
AuthorDate: Wed Jun 3 13:33:18 2020 +0100

    ARTEMIS-2788 clear openwire producer state on produce close event
---
 .../core/protocol/openwire/OpenWireConnection.java |  7 +++++
 .../openwire/SessionHandlingOpenWireTest.java      | 35 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 2fa60de..970271c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1204,6 +1204,13 @@ public class OpenWireConnection extends AbstractRemotingConnection
implements Se
 
       @Override
       public Response processRemoveProducer(ProducerId id) throws Exception {
+         ConnectionState cs = getState();
+         if (cs != null) {
+            SessionState ss = cs.getSessionState(id.getParentId());
+            if (ss != null) {
+               ss.removeProducer(id);
+            }
+         }
          return null;
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
index c43d5f7..fec0fcc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SessionHandlingOpenWireTest.java
@@ -20,12 +20,17 @@ import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.SessionState;
 import org.junit.AfterClass;
-import static org.junit.Assert.assertNotNull;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -81,4 +86,32 @@ public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
       assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up
resources for session"));
       assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
    }
+
+
+   @Test
+   public void testProducerState() throws Exception {
+      try (Connection conn = factory.createConnection()) {
+         conn.start();
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination dest = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+         for (int i = 0; i < 10; i++) {
+            MessageProducer messageProducer = session.createProducer(dest);
+            messageProducer.close();
+         }
+
+         // verify no trace of producer on the broker
+         for (RemotingConnection remotingConnection : server.getRemotingService().getConnections())
{
+            if (remotingConnection instanceof OpenWireConnection) {
+               OpenWireConnection openWireConnection = (OpenWireConnection) remotingConnection;
+               ConnectionState connectionState = openWireConnection.getState();
+               if (connectionState != null) {
+                  for (SessionState sessionState : connectionState.getSessionStates()) {
+                     assertTrue("no producer states leaked", Wait.waitFor(() -> sessionState.getProducerIds().isEmpty()));
+                  }
+               }
+            }
+         }
+      }
+   }
 }


Mime
View raw message