activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1924 Test consumer cleanup after socket connection reset
Date Tue, 19 Jun 2018 21:59:48 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 52695fa45 -> 7a4b21e3a


ARTEMIS-1924 Test consumer cleanup after socket connection reset

(cherry picked from commit 69d9b6094a8157bf0a6528420a33aa465b8bcec3)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7a4b21e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7a4b21e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7a4b21e3

Branch: refs/heads/2.6.x
Commit: 7a4b21e3acdadf9f10b91c0f2cb6202c0d5bc10f
Parents: 52695fa
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Jun 19 17:42:46 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jun 19 17:59:22 2018 -0400

----------------------------------------------------------------------
 .../integration/amqp/AmqpNoHearbeatsTest.java   | 72 ++++++++++++++++++++
 1 file changed, 72 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a4b21e3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
index b35ef7d..6e35c02 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
@@ -16,15 +16,23 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.engine.Connection;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -80,4 +88,68 @@ public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
       connection.close();
    }
 
+   private static final String QUEUE_NAME = "queue://testHeartless";
+
+   // This test is validating a scenario where the client will leave with connection reset
+   // This is done by setting soLinger=0 on the socket, which will make the system to issue
a connection.reset instead of sending a
+   // disconnect.
+   @Test(timeout = 60000)
+   public void testCloseConsumerOnConnectionReset() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout());
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      assertNotNull(connection);
+
+      connection.getStateInspector().assertValid();
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createReceiver(QUEUE_NAME);
+
+      // This test needs a remote process exiting without closing the socket
+      // with soLinger=0 on the socket so it will issue a connection.reset
+      Process p = SpawnedVMSupport.spawnVM(AmqpNoHearbeatsTest.class.getName(), "testConnectionReset");
+      Assert.assertEquals(33, p.waitFor());
+
+      AmqpSender sender = session.createSender(QUEUE_NAME);
+
+      for (int i = 0; i < 10; i++) {
+         AmqpMessage msg = new AmqpMessage();
+         msg.setBytes(new byte[] {0});
+         sender.send(msg);
+      }
+
+      receiver.flow(20);
+
+      for (int i = 0; i < 10; i++) {
+         AmqpMessage msg = receiver.receive(1, TimeUnit.SECONDS);
+         Assert.assertNotNull(msg);
+         msg.accept();
+      }
+   }
+
+   public static void main(String[] arg) {
+      if (arg.length > 0 && arg[0].equals("testConnectionReset")) {
+         try {
+            AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:5672?transport.soLinger=0"),
null, null);
+            AmqpConnection connection = client.connect();
+            AmqpSession session = connection.createSession();
+            AmqpReceiver receiver = session.createReceiver(QUEUE_NAME);
+            receiver.flow(10);
+            System.exit(33);
+         } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+         }
+      }
+   }
+
 }


Mime
View raw message