activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] 01/03: ARTEMIS-2750 Fixing out of order cancellations in AMQP
Date Mon, 04 May 2020 19:41:11 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7438d7ff070ef941f61a66a41c88b74fa0380f32
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Mon May 4 12:22:23 2020 -0400

    ARTEMIS-2750 Fixing out of order cancellations in AMQP
---
 .../amqp/proton/AMQPConnectionContext.java         | 12 +++++++---
 .../amqp/proton/ProtonServerSenderContext.java     | 28 +++++++++++++++++-----
 .../core/server/impl/MessageReferenceImpl.java     |  5 +++-
 3 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 505e229..9bb3d70 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -544,8 +544,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
 
       // We scheduled it for later, as that will work through anything that's pending on
the current deliveries.
       runNow(() -> {
-         link.close();
-         link.free();
 
          ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
          if (linkContext != null) {
@@ -555,7 +553,15 @@ public class AMQPConnectionContext extends ProtonInitializable implements
EventH
                log.error(e.getMessage(), e);
             }
          }
-         flush();
+
+         /// we have to perform the link.close after the linkContext.close is finished.
+         // linkeContext.close will perform a few executions on the netty loop,
+         // this has to come next
+         runLater(() -> {
+            link.close();
+            link.free();
+            flush();
+         });
 
       });
    }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ca3d418..b951b0b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -569,8 +569,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
     */
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
+      // we need to mark closed first to make sure no more adds are accepted
+      closed = true;
+
+      // MessageReferences are sent to the Connection executor (Netty Loop)
+      // as a result the returning references have to be done later after they
+      // had their chance to finish and clear the runnable
+      connection.runLater(() -> {
+         try {
+            internalClose(remoteLinkClose);
+         } catch (Exception e) {
+            log.warn(e.getMessage(), e);
+         }
+      });
+   }
+
+   private void internalClose(boolean remoteLinkClose) throws ActiveMQAMQPException {
       try {
-         closed = true;
          protonSession.removeSender(sender);
          sessionSPI.closeSender(brokerConsumer);
          // if this is a link close rather than a connection close or detach, we need to
delete
@@ -836,6 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
       void resume() {
          connection.runNow(this::deliver);
       }
+
       void deliver() {
 
          // This is discounting some bytes due to Transfer payload
@@ -994,11 +1010,11 @@ public class ProtonServerSenderContext extends ProtonInitializable
implements Pr
    }
 
    private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
-                                         String clientId,
-                                         String pubId,
-                                         boolean shared,
-                                         boolean global,
-                                         boolean isVolatile) {
+                                               String clientId,
+                                               String pubId,
+                                               boolean shared,
+                                               boolean global,
+                                               boolean isVolatile) {
       if (useCoreSubscriptionNaming) {
          final boolean durable = !isVolatile;
          final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index e9238fe..dea1478 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -114,7 +114,10 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    @Override
    public void onDelivery(Consumer<? super MessageReference> onDelivery) {
-      assert this.onDelivery == null;
+      // I am keeping this commented out as a documentation feature:
+      // a Message reference may eventually be taken back before the connection.run was finished.
+      // as a result it may be possible to have this.onDelivery != null here due to cancellations.
+      // assert this.onDelivery == null;
       this.onDelivery = onDelivery;
    }
 


Mime
View raw message