activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2440 Call timeout should retry the connection asynchronously
Date Mon, 05 Aug 2019 18:20:45 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 205a139  ARTEMIS-2440 Call timeout should retry the connection asynchronously
     new a098685  This closes #2786
205a139 is described below

commit 205a1399e7777ec6abc9f0a8f2cfe8b337a861f0
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Mon Aug 5 10:36:21 2019 -0400

    ARTEMIS-2440 Call timeout should retry the connection asynchronously
---
 .../core/client/impl/ServerLocatorImpl.java        |  5 +-
 .../core/impl/ActiveMQClientProtocolManager.java   | 11 +++-
 .../protocol/core/impl/RemotingConnectionImpl.java | 17 +++---
 .../spi/core/remoting/ClientProtocolManager.java   |  3 +
 .../amqp/broker/AMQPConnectionCallback.java        |  8 ---
 .../broker/ActiveMQProtonRemotingConnection.java   |  8 +--
 .../amqp/client/ProtonClientProtocolManager.java   |  6 ++
 .../core/protocol/openwire/OpenWireConnection.java |  4 +-
 .../protocol/openwire/OpenWireProtocolManager.java |  2 +-
 .../artemis/core/config/Configuration.java         |  3 +
 .../protocol/core/impl/CoreProtocolManager.java    |  2 +-
 .../wireformat/ReplicationSyncFileMessageTest.java |  4 +-
 ...ListenerForConnectionTimedOutExceptionTest.java | 68 ++++++++++++++++++++++
 13 files changed, 111 insertions(+), 30 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 8ac7c9e..a05a8af 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -587,7 +587,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
    @Override
    public ClientProtocolManager newProtocolManager() {
-      return getProtocolManagerFactory().newProtocolManager();
+      if (threadPool == null) {
+         throw new NullPointerException("No Thread Pool");
+      }
+      return getProtocolManagerFactory().newProtocolManager().setExecutor(new OrderedExecutor(threadPool));
    }
 
    @Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 9eb0ee5..b1d6cc8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
@@ -82,6 +83,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
 
    private ClientSessionFactoryInternal factoryInternal;
 
+   private Executor executor;
+
    /**
     * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch}
     */
@@ -158,6 +161,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
    }
 
    @Override
+   public ActiveMQClientProtocolManager setExecutor(Executor executor) {
+      this.executor = executor;
+      return this;
+   }
+
+   @Override
    public Lock lockSessionCreation() {
       try {
          Lock localFailoverLock = factoryInternal.lockFailover();
@@ -412,7 +421,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
                                      List<Interceptor> incomingInterceptors,
                                      List<Interceptor> outgoingInterceptors,
                                      TopologyResponseHandler topologyResponseHandler) {
-      this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection,
callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
+      this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection,
callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor);
 
       this.topologyResponseHandler = topologyResponseHandler;
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 065277a..418e3f1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -92,8 +92,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
                                  final long blockingCallTimeout,
                                  final long blockingCallFailoverTimeout,
                                  final List<Interceptor> incomingInterceptors,
-                                 final List<Interceptor> outgoingInterceptors) {
-      this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout,
incomingInterceptors, outgoingInterceptors, true, null, null);
+                                 final List<Interceptor> outgoingInterceptors,
+                                 final Executor connectionExecutor) {
+      this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout,
incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor);
    }
 
    /*
@@ -103,9 +104,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection
implement
                           final Connection transportConnection,
                           final List<Interceptor> incomingInterceptors,
                           final List<Interceptor> outgoingInterceptors,
-                          final Executor executor,
-                          final SimpleString nodeID) {
-      this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors,
false, executor, nodeID);
+                          final SimpleString nodeID,
+                          final Executor connectionExecutor) {
+      this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors,
false, nodeID, connectionExecutor);
    }
 
    private RemotingConnectionImpl(final PacketDecoder packetDecoder,
@@ -115,9 +116,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection
implement
                                   final List<Interceptor> incomingInterceptors,
                                   final List<Interceptor> outgoingInterceptors,
                                   final boolean client,
-                                  final Executor executor,
-                                  final SimpleString nodeID) {
-      super(transportConnection, executor);
+                                  final SimpleString nodeID,
+                                  final Executor connectionExecutor) {
+      super(transportConnection, connectionExecutor);
 
       this.packetDecoder = packetDecoder;
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
index e2c9fc1..37e699e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientProtocolManager.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.spi.core.remoting;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Lock;
 
 import io.netty.channel.ChannelPipeline;
@@ -27,6 +28,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public interface ClientProtocolManager {
 
+   ClientProtocolManager setExecutor(Executor executor);
+
    /// Life Cycle Methods:
 
    RemotingConnection connect(Connection transportConnection,
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index d34ce80..1667945 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -182,14 +182,6 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener
{
       }
    }
 
-   public Executor getExeuctor() {
-      if (protonConnectionDelegate != null) {
-         return protonConnectionDelegate.getExecutor();
-      } else {
-         return null;
-      }
-   }
-
    public void setConnection(AMQPConnectionContext connection) {
       this.amqpConnection = connection;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 5692079..971702e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -46,17 +46,13 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
    public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
                                            AMQPConnectionContext amqpConnection,
                                            Connection transportConnection,
-                                           Executor executor) {
-      super(transportConnection, executor);
+                                           Executor connectionExecutor) {
+      super(transportConnection, connectionExecutor);
       this.manager = manager;
       this.amqpConnection = amqpConnection;
       transportConnection.setProtocolConnection(this);
    }
 
-   public Executor getExecutor() {
-      return this.executor;
-   }
-
    public ProtonProtocolManager getManager() {
       return manager;
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
index 54b8c67..43ae226 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -45,6 +46,11 @@ public class ProtonClientProtocolManager extends ProtonProtocolManager
implement
    }
 
    @Override
+   public ClientProtocolManager setExecutor(Executor executor) {
+      return null;
+   }
+
+   @Override
    public void stop() {
       throw new UnsupportedOperationException();
    }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0b94ab2..6c846ba 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -198,9 +198,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
-                             Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
-                             OpenWireFormat wf) {
+                             OpenWireFormat wf,
+                             Executor executor) {
       super(connection, executor);
       this.server = server;
       this.operationContext = server.newOperationContext();
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 505564d..44cc8da 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -232,7 +232,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection)
{
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(),
this, wf);
+      OpenWireConnection owConn = new OpenWireConnection(connection, server, this, wf, server.getExecutorFactory().getExecutor());
       owConn.sendHandshake();
 
       //first we setup ttl to -1
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 40fb4f3..c3fb985 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -328,15 +328,18 @@ public interface Configuration {
    Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming);
 
    /**
+    * deprecated: we decide based on the semantic context when to make things async or not
     * Returns whether code coming from connection is executed asynchronously or not. <br>
     * Default value is
     * {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED}.
     */
+   @Deprecated
    boolean isAsyncConnectionExecutionEnabled();
 
    /**
     * Sets whether code coming from connection is executed asynchronously or not.
     */
+   @Deprecated
    Configuration setEnabledAsyncConnectionExecution(boolean enabled);
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index 7a416d9..5862003 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor>
{
 
       Executor connectionExecutor = server.getExecutorFactory().getExecutor();
 
-      final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(),
connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled()
? connectionExecutor : null, server.getNodeID());
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(),
connection, incomingInterceptors, outgoingInterceptors, server.getNodeID(), connectionExecutor);
 
       Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
index 812de2c..f01e5e6 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
@@ -49,7 +49,7 @@ public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
       FileChannel fileChannel = raf.getChannel();
       ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
                                                                                         
    null, 10, raf, fileChannel, 0, dataSize);
-      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn,
10, 10, null, null);
+      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn,
10, 10, null, null, null);
       ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
       Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize()
- DataConstants.SIZE_INT);
       Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize()
- dataSize);
@@ -69,7 +69,7 @@ public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
       FileChannel fileChannel = raf.getChannel();
       ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
                                                                                         
    null, fileId, raf, fileChannel, 0, 0);
-      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn,
10, 10, null, null);
+      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn,
10, 10, null, null, null);
       ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
       Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize()
- DataConstants.SIZE_INT);
       Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
index 6579e0b..3206c2c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.jms.connection;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -50,6 +51,18 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
 
    @Test(timeout = 60000)
    public void testOnAcknowledge() throws Exception {
+      testOnAcknowledge(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testOnAcknowledgeBlockOnFailover() throws Exception {
+      // this is validating a case where failover would block
+      // and yet the exception should already happen asynchronously
+      testOnAcknowledge(true);
+   }
+
+   public void testOnAcknowledge(boolean blockOnFailover) throws Exception {
+      mayBlock.set(blockOnFailover);
       Connection sendConnection = null;
       Connection connection = null;
       AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
@@ -86,6 +99,10 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
          fail("JMSException expected");
 
       } catch (JMSException e) {
+         if (blockOnFailover) {
+            Wait.assertTrue(blocked::get);
+            unblock();
+         }
          assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
          //Ensure JMS Connection ExceptionListener was also invoked
          assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100));
@@ -102,6 +119,16 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
 
    @Test(timeout = 60000)
    public void testOnSend() throws Exception {
+      testOnSend(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testOnSendBlockOnFailover() throws Exception {
+      testOnSend(true);
+   }
+
+   public void testOnSend(boolean blockOnFailover) throws Exception {
+      mayBlock.set(blockOnFailover);
       Connection sendConnection = null;
       Connection connection = null;
       AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
@@ -125,6 +152,10 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
          fail("JMSException expected");
 
       } catch (JMSException e) {
+         if (blockOnFailover) {
+            Wait.assertTrue(blocked::get);
+            unblock();
+         }
          assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
          //Ensure JMS Connection ExceptionListener was also invoked
          assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100));
@@ -140,6 +171,30 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
       }
    }
 
+   static AtomicBoolean mayBlock = new AtomicBoolean(true);
+   static AtomicBoolean blocked = new AtomicBoolean(false);
+
+   private static void block() {
+      if (!mayBlock.get()) {
+         return;
+      }
+
+      blocked.set(true);
+
+      try {
+         long timeOut = System.currentTimeMillis() + 5000;
+         while (mayBlock.get() && System.currentTimeMillis() < timeOut) {
+            Thread.yield();
+         }
+      } finally {
+         blocked.set(false);
+      }
+   }
+
+   private static void unblock() {
+      mayBlock.set(false);
+   }
+
 
    static Packet lastPacketSent;
 
@@ -156,6 +211,12 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
 
       @Override
       public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
+         // CheckForFailoverReply is ignored here, as this is simulating an issue where the
server is completely not responding, the blocked call should throw an exception asynchrnously
to the retry
+         if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
+            block();
+            return true;
+         }
+
          if (lastPacketSent.getType() == PacketImpl.SESS_ACKNOWLEDGE && packet.getType()
== PacketImpl.NULL_RESPONSE) {
             return false;
          }
@@ -167,9 +228,16 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends
JMSTest
 
       @Override
       public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
+         // CheckForFailoverReply is ignored here, as this is simulating an issue where the
server is completely not responding, the blocked call should throw an exception asynchrnously
to the retry
+         if (packet.getType() == PacketImpl.CHECK_FOR_FAILOVER_REPLY) {
+            block();
+            return true;
+         }
+
          if (lastPacketSent.getType() == PacketImpl.SESS_SEND && packet.getType()
== PacketImpl.NULL_RESPONSE) {
             return false;
          }
+
          return true;
       }
    }


Mime
View raw message