qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gmur...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1276 - Added a cleanup function qdrc_address_endpoint_cleanup which will clean out the link's edge context in addition to cleaning out other state. This will prevent the crash outlined in the issue
Date Fri, 15 Mar 2019 17:57:16 GMT
This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 644935d  DISPATCH-1276 - Added a cleanup function qdrc_address_endpoint_cleanup which
will clean out the link's edge context in addition to cleaning out other state. This will
prevent the crash outlined in the issue
644935d is described below

commit 644935decfeb90af1fd0315437fd2256a0ba576c
Author: Ganesh Murthy <gmurthy@redhat.com>
AuthorDate: Thu Mar 7 09:30:32 2019 -0500

    DISPATCH-1276 - Added a cleanup function qdrc_address_endpoint_cleanup which will clean
out the link's edge context in addition to cleaning out other state. This will prevent the
crash outlined in the issue
---
 .../edge_addr_tracking/edge_addr_tracking.c        | 89 +++++++++++++++-------
 src/router_core/modules/edge_router/addr_proxy.c   |  2 +-
 2 files changed, 61 insertions(+), 30 deletions(-)

diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
index e161963..b568652 100644
--- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
@@ -31,7 +31,8 @@ struct qdr_addr_endpoint_state_t {
     qdrc_endpoint_t                    *endpoint;
     qdr_connection_t                   *conn;    // The connection associated with the endpoint.
     qdr_addr_tracking_module_context_t *mc;
-    qdr_link_t                         *link;
+    int                                ref_count;
+    bool                               closed; // Is the endpoint that this state belong
to closed?
 };
 
 DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
@@ -77,7 +78,7 @@ static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_
     return msg;
 }
 
-static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t
 endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t
 endpoint_state_list, qdr_connection_t *conn)
 {
     qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
     while(endpoint_state) {
@@ -101,12 +102,12 @@ static void qdrc_address_endpoint_first_attach(void              *bind_context,
     qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
 
     ZERO(endpoint_state);
-    endpoint_state->endpoint = endpoint;
-    endpoint_state->mc       = bc;
-    endpoint_state->conn     = qdrc_endpoint_get_connection_CT(endpoint);
+    endpoint_state->endpoint  = endpoint;
+    endpoint_state->mc        = bc;
+    endpoint_state->conn      = qdrc_endpoint_get_connection_CT(endpoint);
 
-    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
 
+    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
 
     //
     // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created
only if this is a receiver link
@@ -133,18 +134,33 @@ static void qdrc_address_endpoint_on_first_detach(void *link_context,
 {
     qdr_addr_endpoint_state_t *endpoint_state  = (qdr_addr_endpoint_state_t *)link_context;
     qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint,
0);
-    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
-    DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
-    endpoint_state->conn = 0;
-    endpoint_state->endpoint = 0;
-    if (endpoint_state->link) {
-        endpoint_state->link->edge_context = 0;
-        endpoint_state->link = 0;
-    }
-    free_qdr_addr_endpoint_state_t(endpoint_state);
     qdr_error_free(error);
 }
 
+static void qdrc_address_endpoint_cleanup(void *link_context)
+{
+    qdr_addr_endpoint_state_t *endpoint_state  = (qdr_addr_endpoint_state_t *)link_context;
+    if (endpoint_state) {
+        qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+        assert (endpoint_state->conn);
+        endpoint_state->closed = true;
+        if (endpoint_state->ref_count == 0) {
+
+            //
+            // The endpoint has been closed and no other links are referencing this endpoint.
Time to free it.
+            // Clean out all the states held by the link_context (endpoint_state)
+            //
+            if (mc) {
+                DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+            }
+
+            endpoint_state->conn = 0;
+            endpoint_state->endpoint = 0;
+            free_qdr_addr_endpoint_state_t(endpoint_state);
+        }
+    }
+}
+
 
 static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
 {
@@ -201,7 +217,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t
*addr
                 while (inlink) {
                     if(inlink->link->edge_context != 0) {
                         qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
-                        if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+                        if (!endpoint_state->closed && qdrc_can_send_address(addr,
endpoint_state->conn) ) {
                             qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
                             qdrc_send_message(addr_tracking->core, addr, endpoint, true);
                         }
@@ -222,7 +238,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t
*addr
             while (inlink) {
                 if(inlink->link->edge_context != 0) {
                     qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
-                    if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+                    if (!endpoint_state->closed && qdrc_can_send_address(addr,
endpoint_state->conn) ) {
                         qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
                         if (endpoint)
                             qdrc_send_message(addr_tracking->core, addr, endpoint, true);
@@ -244,9 +260,11 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t
*addr
                 while (inlink) {
                     if(inlink->link->edge_context != 0) {
                         qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
-                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
-                        if (endpoint)
-                            qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+                        if(!endpoint_state->closed) {
+                            qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+                            if (endpoint)
+                                qdrc_send_message(addr_tracking->core, addr, endpoint,
false);
+                        }
                     }
                     inlink = DEQ_NEXT(inlink);
                 }
@@ -268,10 +286,10 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t
*addr
 
             qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
             while (inlink) {
-                if(inlink->link->edge_context != 0) {
+                if (inlink->link->edge_context != 0) {
                     qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
                     qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
-                    if (endpoint_state->conn == link->conn) {
+                    if (endpoint_state->conn == link->conn && !endpoint_state->closed)
{
                         qdrc_send_message(addr_tracking->core, addr, endpoint, false);
                         break;
                     }
@@ -292,7 +310,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t
*addr
                 if(inlink->link->edge_context != 0) {
                     qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
                     qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
-                    if (link->conn == endpoint_state->conn) {
+                    if (link->conn == endpoint_state->conn && !endpoint_state->closed)
{
                         qdrc_send_message(addr_tracking->core, addr, endpoint, true);
                         break;
                     }
@@ -314,11 +332,12 @@ static void on_link_event(void *context, qdrc_event_t event, qdr_link_t
*link)
         {
             qdr_addr_tracking_module_context_t *mc = (qdr_addr_tracking_module_context_t
*) context;
             qdr_address_t *addr = link->owning_addr;
-            if (addr && qdr_address_is_mobile_CT(addr)) {
-                qdr_addr_endpoint_state_t *endpoint_state = qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list,
link->conn, link);
+            if (addr && qdr_address_is_mobile_CT(addr) && DEQ_SIZE(addr->subscriptions)
== 0 && link->link_direction == QD_INCOMING) {
+                qdr_addr_endpoint_state_t *endpoint_state = qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list,
link->conn);
+                assert(endpoint_state);
+                assert(link->edge_context == 0);
                 link->edge_context = endpoint_state;
-                endpoint_state->link = link;
-
+                endpoint_state->ref_count++;
                 if (qdrc_can_send_address(addr, link->conn) && endpoint_state)
{
                     qdrc_send_message(mc->core, addr, endpoint_state->endpoint, true);
                 }
@@ -329,10 +348,21 @@ static void on_link_event(void *context, qdrc_event_t event, qdr_link_t
*link)
         {
             if (link->edge_context) {
                 qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link->edge_context;
-                endpoint_state->link = 0;
+                endpoint_state->ref_count--;
                 link->edge_context = 0;
+                //
+                // The endpoint has been closed and no other links are referencing this endpoint.
Time to free it.
+                //
+                if (endpoint_state->ref_count == 0 && endpoint_state->closed)
{
+                    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+                    if (mc) {
+                        DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+                    }
+                    endpoint_state->conn = 0;
+                    endpoint_state->endpoint = 0;
+                    free_qdr_addr_endpoint_state_t(endpoint_state);
+                }
             }
-
             break;
         }
 
@@ -361,6 +391,7 @@ static void qdrc_edge_address_tracking_init_CT(qdr_core_t *core, void
**module_c
     context->addr_tracking_endpoint.label = "qdrc_edge_address_tracking_module_init_CT";
     context->addr_tracking_endpoint.on_first_attach  = qdrc_address_endpoint_first_attach;
     context->addr_tracking_endpoint.on_first_detach  = qdrc_address_endpoint_on_first_detach;
+    context->addr_tracking_endpoint.on_cleanup  = qdrc_address_endpoint_cleanup;
     qdrc_endpoint_bind_mobile_address_CT(core, QD_TERMINUS_EDGE_ADDRESS_TRACKING, '0', &context->addr_tracking_endpoint,
context);
 
     //
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index c73dfc0..ed593fd 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -160,7 +160,7 @@ static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr)
 
 static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr)
 {
-    if (addr->edge_outlink == 0) {
+    if (addr->edge_outlink == 0 && DEQ_SIZE(addr->subscriptions) == 0) {
         //
         // Note that this link must not be bound to the address at this time.  That will
         // happen later when the interior tells us that there are upstream destinations


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message