activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1486 Core client should be notified if consumer is closed on broker side
Date Thu, 02 Nov 2017 11:19:58 GMT
ARTEMIS-1486 Core client should be notified if consumer is closed on broker side


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/61ce7a74
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/61ce7a74
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/61ce7a74

Branch: refs/heads/master
Commit: 61ce7a74545dd33340a3cf619c1b4ef9e8558bf2
Parents: 8703d9d
Author: Stanislav Knot <sknot@redhat.com>
Authored: Tue Oct 31 14:08:23 2017 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Thu Nov 2 11:19:35 2017 +0000

----------------------------------------------------------------------
 .../impl/ActiveMQServerControlImpl.java         |  1 +
 .../management/ActiveMQServerControlTest.java   | 32 ++++++++++++++++++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61ce7a74/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 03defb5..b5b5a96 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1621,6 +1621,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements
Active
                for (ServerConsumer serverConsumer : serverConsumers) {
                   if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
                      serverConsumer.close(true);
+                     serverConsumer.disconnect();
                      return true;
                   }
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61ce7a74/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index e3725d8..fde247c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -57,6 +57,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@@ -1978,6 +1980,36 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
    }
 
+   @Test
+   public void testCloseConsumer() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString name = RandomUtil.randomSimpleString();
+      boolean durable = true;
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable,
-1, false, false);
+
+      ServerLocator receiveLocator = createInVMNonHALocator();
+      ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
+      ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
+      final ClientConsumer consumer = receiveClientSession.createConsumer(name);
+      final ClientProducer producer = receiveClientSession.createProducer(name);
+
+      ServerSession ss = server.getSessions().iterator().next();
+      ServerConsumer sc = ss.getServerConsumers().iterator().next();
+
+      producer.send(receiveClientSession.createMessage(true));
+      consumer.receive(1000);
+
+      Assert.assertFalse(consumer.isClosed());
+      serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(),
Long.toString(sc.sequentialID()));
+      Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
+      Assert.assertTrue(consumer.isClosed());
+   }
+
    protected void scaleDown(ScaleDownHandler handler) throws Exception {
       SimpleString address = new SimpleString("testQueue");
       HashMap<String, Object> params = new HashMap<>();


Mime
View raw message