activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From michaelpea...@apache.org
Subject [activemq-nms-amqp] branch master updated: AMQNET-609: Error during message delivery may block consumer
Date Thu, 05 Sep 2019 11:08:55 GMT
This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/master by this push:
     new bd00f3c  AMQNET-609: Error during message delivery may block consumer
     new 69d9b17  Merge pull request #26 from Havret/exception_during_auto_ack_should_not_stop_consumer_from_receiving_messages
bd00f3c is described below

commit bd00f3c56e05e4eefcef5f22beb9df7424506570
Author: Havret <h4vret@gmail.com>
AuthorDate: Thu Sep 5 00:53:53 2019 +0200

    AMQNET-609: Error during message delivery may block consumer
---
 src/NMS.AMQP/NmsConnection.cs                      |  2 +-
 src/NMS.AMQP/NmsMessageConsumer.cs                 | 87 +++++++++++++---------
 .../Integration/FailoverIntegrationTest.cs         | 65 ++++++++++++++++
 3 files changed, 116 insertions(+), 38 deletions(-)

diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index 586fa62..f8bf283 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -514,7 +514,7 @@ namespace Apache.NMS.AMQP
             }
         }
 
-        public void OnAsyncException(Exception error)
+        internal void OnAsyncException(Exception error)
         {
             ExceptionListener?.Invoke(error);
         }
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index deb3141..13a4cf0 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -213,55 +213,68 @@ namespace Apache.NMS.AMQP
             {
                 lock (SyncRoot)
                 {
-                    if (started && Listener != null)
+                    try
                     {
-                        var envelope = messageQueue.DequeueNoWait();
-                        if (envelope == null)
-                            return;
-
-                        if (IsMessageExpired(envelope))
-                        {
-                            if (Tracer.IsDebugEnabled)
-                                Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
-
-                            DoAckExpired(envelope);
-                        }
-                        else if (IsRedeliveryExceeded(envelope))
+                        if (started && Listener != null)
                         {
-                            if (Tracer.IsDebugEnabled)
-                                Tracer.Debug($"{Info.Id} filtered message with excessive
redelivery count: {envelope.RedeliveryCount}");
-
-                            // TODO: Apply redelivery policy
-                            DoAckExpired(envelope);
-                        }
-                        else
-                        {
-                            bool deliveryFailed = false;
-                            bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge
|| acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
-
-                            if (autoAckOrDupsOk)
-                                DoAckDelivered(envelope);
-                            else
-                                AckFromReceive(envelope);
+                            var envelope = messageQueue.DequeueNoWait();
+                            if (envelope == null)
+                                return;
 
-                            try
+                            if (IsMessageExpired(envelope))
                             {
-                                Listener.Invoke(envelope.Message.Copy());
+                                if (Tracer.IsDebugEnabled)
+                                    Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
+
+                                DoAckExpired(envelope);
                             }
-                            catch (Exception)
+                            else if (IsRedeliveryExceeded(envelope))
                             {
-                                deliveryFailed = true;
-                            }
+                                if (Tracer.IsDebugEnabled)
+                                    Tracer.Debug($"{Info.Id} filtered message with excessive
redelivery count: {envelope.RedeliveryCount}");
 
-                            if (autoAckOrDupsOk)
+                                // TODO: Apply redelivery policy
+                                DoAckExpired(envelope);
+                            }
+                            else
                             {
-                                if (!deliveryFailed)
-                                    DoAckConsumed(envelope);
+                                bool deliveryFailed = false;
+                                bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge
|| acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
+
+                                if (autoAckOrDupsOk)
+                                    DoAckDelivered(envelope);
                                 else
-                                    DoAckReleased(envelope);
+                                    AckFromReceive(envelope);
+
+                                try
+                                {
+                                    Listener.Invoke(envelope.Message.Copy());
+                                }
+                                catch (Exception)
+                                {
+                                    deliveryFailed = true;
+                                }
+
+                                if (autoAckOrDupsOk)
+                                {
+                                    if (!deliveryFailed)
+                                        DoAckConsumed(envelope);
+                                    else
+                                        DoAckReleased(envelope);
+                                }
                             }
                         }
                     }
+                    catch (Exception e)
+                    {
+                        // TODO - There are two cases when we can get an error here:
+                        // 1) error returned from the attempted ACK that was sent
+                        // 2) error while attempting to copy the incoming message.
+                        //
+                        // We need to decide how to respond to these, but definitely we cannot
+                        // let this error propagate as it could take down the SessionDispatcher
+                        Session.Connection.OnAsyncException(e);
+                    }
                 }
             }
         }
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 897daa0..e4fb515 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -952,6 +952,71 @@ namespace NMS.AMQP.Test.Integration
             }
         }
 
+        [Test, Timeout(20_000)]
+        public void TestConsumerCanReceivesMessagesWhenConnectionLostDuringAutoAck()
+        {
+            using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+            using (TestAmqpPeer finalPeer = new TestAmqpPeer())
+            {
+                ManualResetEvent originalConnected = new ManualResetEvent(false);
+                ManualResetEvent finalConnected = new ManualResetEvent(false);
+                ManualResetEvent exceptionThrown = new ManualResetEvent(false);
+
+                // Connect to the first peer
+                originalPeer.ExpectSaslAnonymous();
+                originalPeer.ExpectOpen();
+                originalPeer.ExpectBegin();
+                originalPeer.ExpectBegin();
+
+                NmsConnection connection = EstablishAnonymousConnection(originalPeer, finalPeer);
+                connection.ExceptionListener += exception => { exceptionThrown.Set();
};
+
+                Mock<INmsConnectionListener> connectionListener = new Mock<INmsConnectionListener>();
+
+                connectionListener
+                    .Setup(listener => listener.OnConnectionEstablished(It.IsAny<Uri>()))
+                    .Callback(() => { originalConnected.Set(); });
+
+                connectionListener
+                    .Setup(listener => listener.OnConnectionRestored(It.IsAny<Uri>()))
+                    .Callback(() => { finalConnected.Set(); });
+
+                connection.AddConnectionListener(connectionListener.Object);
+
+                connection.Start();
+
+                Assert.True(originalConnected.WaitOne(TimeSpan.FromSeconds(5)), "Should connect
to original peer");
+
+                originalPeer.ExpectReceiverAttach();
+                originalPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(),
1);
+                originalPeer.DropAfterLastMatcher();
+
+                // Post Failover Expectations of FinalPeer
+                finalPeer.ExpectSaslAnonymous();
+                finalPeer.ExpectOpen();
+                finalPeer.ExpectBegin();
+                finalPeer.ExpectBegin();
+                finalPeer.ExpectReceiverAttach();
+                finalPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(),
1);
+                finalPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+                int msgReceivedCount = 0;
+                messageConsumer.Listener += message =>
+                {
+                    finalConnected.WaitOne(TimeSpan.FromSeconds(5));
+                    msgReceivedCount++;
+                };
+
+                finalPeer.WaitForAllMatchersToComplete(5000);
+
+                Assert.AreEqual(2, msgReceivedCount);
+                Assert.IsTrue(exceptionThrown.WaitOne(TimeSpan.FromSeconds(1)));
+            }
+        }
+
         private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers)
         {
             return EstablishAnonymousConnection(null, null, peers);


Mime
View raw message