activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2327 ExceptionListener invoked when connection level exception
Date Sat, 04 May 2019 03:11:55 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new b60c304  ARTEMIS-2327 ExceptionListener invoked when connection level exception
     new c56e84c  This closes #2655
b60c304 is described below

commit b60c304c70e16fc9724c736da8a3fbc5fe40ad04
Author: Michael André Pearce <michael.andre.pearce@me.com>
AuthorDate: Tue Apr 30 21:49:19 2019 +0100

    ARTEMIS-2327 ExceptionListener invoked when connection level exception
    
    Add tests
    Add fix - if timeout occurs on sending packet, calls same code that is invoked if timeout
occurs on during ping aligning logic, and ensuring JMS connection exception listener gets
invoked to inform the client logic to react.
---
 .../core/protocol/core/impl/ChannelImpl.java       |   4 +-
 ...ListenerForConnectionTimedOutExceptionTest.java | 173 +++++++++++++++++++++
 .../tests/integration/remoting/ReconnectTest.java  |   6 +-
 3 files changed, 181 insertions(+), 2 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 3fcb9cf..fe876ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -451,7 +451,9 @@ public final class ChannelImpl implements Channel {
             }
 
             if (response == null) {
-               throw ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(),
packet.getType());
+               ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(),
packet.getType());
+               connection.fail(e);
+               throw e;
             }
 
             if (response.getType() == PacketImpl.EXCEPTION) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
new file mode 100644
index 0000000..5b5875b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.connection;
+
+import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTestBase {
+
+   private Queue queue;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      queue = createQueue("TestQueue");
+   }
+
+   @Test(timeout = 60000)
+   public void testOnAcknowledge() throws Exception {
+      Connection sendConnection = null;
+      Connection connection = null;
+      AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
+
+      try {
+         ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
+         ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessAcknowledgeCauseResponseTimeout.class.getName());
+         ((ActiveMQConnectionFactory) cf).setBlockOnAcknowledge(true);
+
+         sendConnection = cf.createConnection();
+
+         final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = sendSession.createProducer(queue);
+
+         TextMessage message = sendSession.createTextMessage();
+
+         message.setText("Message");
+
+         producer.send(message);
+
+         connection = cf.createConnection();
+         connection.start();
+         connection.setExceptionListener(exceptionOnConnection::set);
+         final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         final MessageConsumer messageConsumer = consumerSession.createConsumer(queue);
+
+         TextMessage message1 = (TextMessage) messageConsumer.receive(1000);
+
+         assertEquals("Message", message1.getText());
+
+         message1.acknowledge();
+
+         fail("JMSException expected");
+
+      } catch (JMSException e) {
+         assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
+         //Ensure JMS Connection ExceptionListener was also invoked
+         assertNotNull(exceptionOnConnection.get());
+         assertTrue(exceptionOnConnection.get().getCause() instanceof ActiveMQConnectionTimedOutException);
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+         if (sendConnection != null) {
+            sendConnection.close();
+         }
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testOnSend() throws Exception {
+      Connection sendConnection = null;
+      Connection connection = null;
+      AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
+
+      try {
+         ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
+         ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName());
+
+         sendConnection = cf.createConnection();
+         sendConnection.setExceptionListener(exceptionOnConnection::set);
+         final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = sendSession.createProducer(queue);
+
+         TextMessage message = sendSession.createTextMessage();
+
+         message.setText("Message");
+
+         producer.send(message);
+
+         fail("JMSException expected");
+
+      } catch (JMSException e) {
+         assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
+         //Ensure JMS Connection ExceptionListener was also invoked
+         assertNotNull(exceptionOnConnection.get());
+         assertTrue(exceptionOnConnection.get().getCause() instanceof ActiveMQConnectionTimedOutException);
+
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+         if (sendConnection != null) {
+            sendConnection.close();
+         }
+      }
+   }
+
+
+   static Packet lastPacketSent;
+
+   public static class OutBoundPacketCapture implements Interceptor {
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
+         lastPacketSent = packet;
+         return true;
+      }
+   }
+
+   public static class SessAcknowledgeCauseResponseTimeout implements Interceptor {
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
+         if (lastPacketSent.getType() == PacketImpl.SESS_ACKNOWLEDGE && packet.getType()
== PacketImpl.NULL_RESPONSE) {
+            return false;
+         }
+         return true;
+      }
+   }
+
+   public static class SessSendCauseResponseTimeout implements Interceptor {
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
+         if (lastPacketSent.getType() == PacketImpl.SESS_SEND && packet.getType()
== PacketImpl.NULL_RESPONSE) {
+            return false;
+         }
+         return true;
+      }
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
index 6eadf4f..c15b175 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
@@ -330,9 +330,13 @@ public class ReconnectTest extends ActiveMQTestBase {
       server.start();
       // imitate session reattach timeout
       Interceptor reattachInterceptor = new Interceptor() {
+
+         boolean reattached;
+
          @Override
          public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
{
-            if (packet.getType() == PacketImpl.REATTACH_SESSION) {
+            if (!reattached && packet.getType() == PacketImpl.REATTACH_SESSION) {
+               reattached = true;
                return false;
             } else {
                return true;


Mime
View raw message