activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1924 Add amqpIdleTimeout
Date Tue, 19 Jun 2018 16:23:47 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 477a6400c -> 3edce7245


ARTEMIS-1924 Add amqpIdleTimeout

(cherry picked from commit cb793e0e9827535fc3ce1c66b4ffbc41f513e9f5)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3edce724
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3edce724
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3edce724

Branch: refs/heads/2.6.x
Commit: 3edce7245643a08ba95b09c27761304c92527590
Parents: 477a640
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Jun 11 17:11:25 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jun 19 12:23:33 2018 -0400

----------------------------------------------------------------------
 .../amqp/broker/ProtonProtocolManager.java      |  28 ++-
 .../amqp/proton/AMQPConnectionContext.java      |   7 +-
 .../amqp/proton/handler/ProtonHandler.java      |  17 +-
 docs/user-manual/en/amqp.md                     |  29 ++++
 .../amqp/AmqpBrokerRequestedHearbeatsTest.java  | 170 +++++++++++++++++++
 .../amqp/AmqpBrokerReuqestedHearbeatsTest.java  | 144 ----------------
 .../integration/amqp/AmqpNoHearbeatsTest.java   |  83 +++++++++
 7 files changed, 327 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index c1a92e0..d86dc81 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -45,12 +45,15 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
 import io.netty.channel.ChannelPipeline;
+import org.jboss.logging.Logger;
 
 /**
  * A proton protocol manager, basically reads the Proton Input and maps proton resources
to ActiveMQ Artemis resources
  */
 public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, AmqpInterceptor,
ActiveMQProtonRemotingConnection> implements NotificationListener {
 
+   private static final Logger logger = Logger.getLogger(ProtonProtocolManager.class);
+
    private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
 
    private final List<AmqpInterceptor> incomingInterceptors = new ArrayList<>();
@@ -72,6 +75,9 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
 
    private String saslLoginConfigScope = "amqp-sasl-gssapi";
 
+   private Long amqpIdleTimeout;
+
+
    /*
    * used when you want to treat senders as a subscription on an address rather than consuming
from the actual queue for
    * the address. This can be changed on the acceptor.
@@ -115,6 +121,17 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
       return false;
    }
 
+   public Long getAmqpIdleTimeout() {
+      return amqpIdleTimeout;
+   }
+
+   public ProtonProtocolManager setAmqpIdleTimeout(Long ttl) {
+      logger.debug("Setting up " + ttl + " as the connectionTtl");
+      this.amqpIdleTimeout = ttl;
+      return this;
+   }
+
+
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
{
       AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection,
server.getExecutorFactory().getExecutor(), server);
@@ -124,6 +141,14 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
          ttl = server.getConfiguration().getConnectionTTLOverride();
       }
 
+      if (getAmqpIdleTimeout() != null) {
+         ttl = getAmqpIdleTimeout().longValue();
+      }
+
+      if (ttl < 0) {
+         ttl = 0;
+      }
+
       String id = server.getConfiguration().getName();
       boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
       AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming,
server.getScheduledPool(), true, null, null);
@@ -136,7 +161,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
 
       connectionCallback.setProtonConnectionDelegate(delegate);
 
-      ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(),
ttl);
+      // connection entry only understands -1 otherwise we would see disconnects for no reason
+      ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(),
ttl <= 0 ? -1 : ttl);
 
       return entry;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index ad3a44f..3552106 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -390,8 +390,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
             scheduledPool.schedule(new Runnable() {
                @Override
                public void run() {
-                  long rescheduleAt = handler.tick(false);
-                  if (rescheduleAt != 0) {
+                  Long rescheduleAt = handler.tick(false);
+                  if (rescheduleAt == null) {
+                     // this mean tick could not acquire a lock, we will just retry in 10
milliseconds.
+                     scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
+                  } else if (rescheduleAt != 0) {
                      scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()),
TimeUnit.MILLISECONDS);
                   }
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 585f658..38ca7a7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -88,7 +88,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener
{
 
    public ProtonHandler(Executor flushExecutor, boolean isServer) {
       this.flushExecutor = flushExecutor;
-      this.readyListener = () -> flushExecutor.execute(() -> {
+      this.readyListener = () -> this.flushExecutor.execute(() -> {
          flush();
       });
       this.creationTime = System.currentTimeMillis();
@@ -105,8 +105,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener
{
       connection.collect(collector);
    }
 
-   public long tick(boolean firstTick) {
-      lock.lock();
+   public Long tick(boolean firstTick) {
+      if (firstTick) {
+         // the first tick needs to guarantee a lock here
+         lock.lock();
+      } else {
+         if (!lock.tryLock()) {
+            log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
+            // if we can't lock the scheduler will retry in a very short period of time instead
of holding the lock here
+            return null;
+         }
+      }
       try {
          if (!firstTick) {
             try {
@@ -122,7 +131,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener
{
                transport.close();
                connection.setCondition(new ErrorCondition());
             }
-            return 0;
+            return 0L;
          }
          return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
       } finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/docs/user-manual/en/amqp.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/amqp.md b/docs/user-manual/en/amqp.md
index a201fbd..30cb2aa 100644
--- a/docs/user-manual/en/amqp.md
+++ b/docs/user-manual/en/amqp.md
@@ -127,3 +127,32 @@ message for later delivery:
 
 If both annotations are present in the same message then the broker will prefer
 the more specific `x-opt-delivery-time` value.
+
+## Configuring AMQP Idle Timeout
+
+It is possible to configure the AMQP Server's IDLE Timeout by setting the property amqpIdleTimeout
in milliseconds on the acceptor.
+
+This will make the server to send an AMQP frame open to the client, with your configured
timeout / 2.
+
+So, if you configured your AMQP Idle Timeout to be 60000, the server will tell the client
to send frames every 30,000 milliseconds.
+
+
+```xml
+<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>
+```
+
+
+### Disabling Keep alive checks
+
+if you set amqpIdleTimeout=0 that will tell clients to not sending keep alive packets towards
the server. On this case
+you will rely on TCP to determine when the socket needs to be closed.
+
+```xml
+<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>
+```
+
+This contains a real example for configuring amqpIdleTimeout:
+
+```xml
+<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10</acceptor>
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java
new file mode 100644
index 0000000..fcc7acd
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.engine.Connection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test handling of heartbeats requested by the broker.
+ */
+@RunWith(Parameterized.class)
+public class AmqpBrokerRequestedHearbeatsTest extends AmqpClientTestSupport {
+
+   private final int TEST_IDLE_TIMEOUT = 1000;
+
+   @Parameterized.Parameters(name = "useOverride={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean useOverride;
+
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      if (!useOverride) {
+         params.put("amqpIdleTimeout", "" + TEST_IDLE_TIMEOUT);
+      }
+   }
+
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      server.getConfiguration().setConnectionTtlCheckInterval(TEST_IDLE_TIMEOUT / 3);
+      if (useOverride) {
+         server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT);
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            assertEquals("Broker did not send half the idle timeout", TEST_IDLE_TIMEOUT /
2, connection.getTransport().getRemoteIdleTimeout());
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      assertNotNull(connection);
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testBrokerSendsHalfConfiguredIdleTimeoutWhenClientSendsTimeout() throws Exception
{
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            assertEquals("Broker did not send half the idle timeout", TEST_IDLE_TIMEOUT /
2, connection.getTransport().getRemoteIdleTimeout());
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setIdleTimeout(TEST_IDLE_TIMEOUT * 4);
+      assertNotNull(connection);
+
+      connection.connect();
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testClientWithoutHeartbeatsGetsDropped() throws Exception {
+
+      final CountDownLatch disconnected = new CountDownLatch(1);
+
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      AmqpConnection connection = addConnection(client.createConnection());
+      assertNotNull(connection);
+
+      connection.setIdleProcessingDisabled(true);
+      connection.setListener(new AmqpConnectionListener() {
+
+         @Override
+         public void onException(Throwable ex) {
+            disconnected.countDown();
+         }
+      });
+
+      connection.connect();
+
+      assertEquals(1, server.getConnectionCount());
+      assertTrue(disconnected.await(30, TimeUnit.SECONDS));
+
+      connection.close();
+
+      Wait.assertEquals(0, server::getConnectionCount);
+   }
+
+   @Test(timeout = 60000)
+   public void testClientWithHeartbeatsStaysAlive() throws Exception {
+
+      final CountDownLatch disconnected = new CountDownLatch(1);
+
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      AmqpConnection connection = addConnection(client.createConnection());
+      assertNotNull(connection);
+
+      connection.setListener(new AmqpConnectionListener() {
+
+         @Override
+         public void onException(Throwable ex) {
+            disconnected.countDown();
+         }
+      });
+
+      connection.connect();
+
+      assertEquals(1, server.getConnectionCount());
+      assertFalse(disconnected.await(5, TimeUnit.SECONDS));
+
+      connection.close();
+
+      Wait.assertEquals(0, server::getConnectionCount);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java
deleted file mode 100644
index 8221ef6..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.Wait;
-import org.apache.activemq.transport.amqp.client.AmqpClient;
-import org.apache.activemq.transport.amqp.client.AmqpConnection;
-import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
-import org.apache.activemq.transport.amqp.client.AmqpValidator;
-import org.apache.qpid.proton.engine.Connection;
-import org.junit.Test;
-
-/**
- * Test handling of heartbeats requested by the broker.
- */
-public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
-
-   private final int TEST_IDLE_TIMEOUT = 1000;
-
-   @Override
-   protected void addConfiguration(ActiveMQServer server) {
-      server.getConfiguration().setConnectionTtlCheckInterval(TEST_IDLE_TIMEOUT / 3);
-      server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT);
-   }
-
-   @Test(timeout = 60000)
-   public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {
-      AmqpClient client = createAmqpClient();
-      assertNotNull(client);
-
-      client.setValidator(new AmqpValidator() {
-
-         @Override
-         public void inspectOpenedResource(Connection connection) {
-            assertEquals("Broker did not send half the idle timeout", TEST_IDLE_TIMEOUT /
2, connection.getTransport().getRemoteIdleTimeout());
-         }
-      });
-
-      AmqpConnection connection = addConnection(client.connect());
-      assertNotNull(connection);
-
-      connection.getStateInspector().assertValid();
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testBrokerSendsHalfConfiguredIdleTimeoutWhenClientSendsTimeout() throws Exception
{
-      AmqpClient client = createAmqpClient();
-      assertNotNull(client);
-
-      client.setValidator(new AmqpValidator() {
-
-         @Override
-         public void inspectOpenedResource(Connection connection) {
-            assertEquals("Broker did not send half the idle timeout", TEST_IDLE_TIMEOUT /
2, connection.getTransport().getRemoteIdleTimeout());
-         }
-      });
-
-      AmqpConnection connection = addConnection(client.createConnection());
-      connection.setIdleTimeout(TEST_IDLE_TIMEOUT * 4);
-      assertNotNull(connection);
-
-      connection.connect();
-      connection.getStateInspector().assertValid();
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testClientWithoutHeartbeatsGetsDropped() throws Exception {
-
-      final CountDownLatch disconnected = new CountDownLatch(1);
-
-      AmqpClient client = createAmqpClient();
-      assertNotNull(client);
-
-      AmqpConnection connection = addConnection(client.createConnection());
-      assertNotNull(connection);
-
-      connection.setIdleProcessingDisabled(true);
-      connection.setListener(new AmqpConnectionListener() {
-
-         @Override
-         public void onException(Throwable ex) {
-            disconnected.countDown();
-         }
-      });
-
-      connection.connect();
-
-      assertEquals(1, server.getConnectionCount());
-      assertTrue(disconnected.await(30, TimeUnit.SECONDS));
-
-      connection.close();
-
-      Wait.assertEquals(0, server::getConnectionCount);
-   }
-
-   @Test(timeout = 60000)
-   public void testClientWithHeartbeatsStaysAlive() throws Exception {
-
-      final CountDownLatch disconnected = new CountDownLatch(1);
-
-      AmqpClient client = createAmqpClient();
-      assertNotNull(client);
-
-      AmqpConnection connection = addConnection(client.createConnection());
-      assertNotNull(connection);
-
-      connection.setListener(new AmqpConnectionListener() {
-
-         @Override
-         public void onException(Throwable ex) {
-            disconnected.countDown();
-         }
-      });
-
-      connection.connect();
-
-      assertEquals(1, server.getConnectionCount());
-      assertFalse(disconnected.await(5, TimeUnit.SECONDS));
-
-      connection.close();
-
-      Wait.assertEquals(0, server::getConnectionCount);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3edce724/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
new file mode 100644
index 0000000..2d5b3cf
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.engine.Connection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
+
+   @Parameterized.Parameters(name = "useOverride={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean useOverride;
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      if (useOverride) {
+         server.getConfiguration().setConnectionTTLOverride(0);
+      } else {
+         server.getConfiguration().setConnectionTtlCheckInterval(500);
+      }
+   }
+
+
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      if (!useOverride) {
+         params.put("amqpIdleTimeout", "0");
+      }
+   }
+
+
+   @Test(timeout = 60000)
+   public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout());
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      assertNotNull(connection);
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+
+}


Mime
View raw message