qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject [qpid-dispatch] 01/02: DISPATCH-1289 - Added link-level logging at the INFO level, including full terminus information and terminal stats DISPATCH-1289 - Added uptime-ticks value in the core state. DISPATCH-1289 - Record the ingress time (in core ticks) for each delivery. DISPATCH-1289 - Added counters for delayed settlements and management access to those counters. DISPATCH-1289 - Cleaned up the columns in qdstat -a: In-process stats are now under --verbose. Local destinations and local containers (link-route [...]
Date Fri, 15 Mar 2019 15:35:05 GMT
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 06aad3c9c570841981aaf2fedb8e465010eb8110
Author: Ted Ross <tross@redhat.com>
AuthorDate: Fri Mar 15 11:12:35 2019 -0400

    DISPATCH-1289 - Added link-level logging at the INFO level, including full terminus information
and terminal stats
    DISPATCH-1289 - Added uptime-ticks value in the core state.
    DISPATCH-1289 - Record the ingress time (in core ticks) for each delivery.
    DISPATCH-1289 - Added counters for delayed settlements and management access to those
counters.
    DISPATCH-1289 - Cleaned up the columns in qdstat -a:  In-process stats are now under --verbose.
 Local destinations and local containers (link-routes) now combined into one column.
    DISPATCH-1289 - Fixed test to handle consolidation of ROUTER_CORE into ROUTER (logs).
    DISPATCH-1289 - Added settlement rate computation on links.
    DISPATCH-1289 - Added 'C' prefix to connection numbers in the server.c INFO logs.
---
 docs/man/qdstat.8.adoc                        |  11 ++-
 include/qpid/dispatch/router_core.h           |  13 +++
 python/qpid_dispatch/management/qdrouter.json |  25 ++++++
 src/router_core/agent_link.c                  |  53 ++++++++---
 src/router_core/agent_link.h                  |   2 +-
 src/router_core/agent_router.c                |  22 +++--
 src/router_core/agent_router.h                |   2 +-
 src/router_core/connections.c                 |  53 ++++++++---
 src/router_core/core_timer.c                  |   2 +
 src/router_core/forwarder.c                   |   2 +
 src/router_core/router_core.c                 |   2 +-
 src/router_core/router_core_private.h         |  35 +++++---
 src/router_core/terminus.c                    | 122 ++++++++++++++++++++++++++
 src/router_core/transfer.c                    |  38 ++++++++
 src/router_node.c                             |  23 +++--
 src/server.c                                  |   8 +-
 tests/system_tests_core_client.py             |   2 +-
 tools/qdstat.in                               |  48 ++++++----
 18 files changed, 390 insertions(+), 73 deletions(-)

diff --git a/docs/man/qdstat.8.adoc b/docs/man/qdstat.8.adoc
index 42491a2..a7eff2a 100644
--- a/docs/man/qdstat.8.adoc
+++ b/docs/man/qdstat.8.adoc
@@ -145,6 +145,12 @@ The number of deliveries on this link that were released.
 mod::
 The number of deliveries on this link that were modified.
 
+delay::
+The number of settled deliveries on this link that were unsettled for more than one second.
+
+rate::
+The average rate (over a period of five seconds) at which deliveries have been settled on
this link.
+
 admin::
 The administrative status of the link:
   - 'enabled' - The link is enabled for normal operation.
@@ -212,14 +218,11 @@ in-proc::
 The number of in-process consumers for this address.
 
 local::
-For this router, the number of local consumers for this address.
+For this router, the number of local consumers for this address, or for link-routed addresses,
the number of locally-attached containers that are destinations for this address.
 
 remote::
 The number of remote routers that have at least one consumer for this address.
 
-cntnr::
-The number of locally-attached containers that are destinations for link routes on this address.
-
 in::
 The number of deliveries for this address that entered the network on this router.
 
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 2f7da25..65b1393 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -286,6 +286,19 @@ qdr_terminus_t *qdr_terminus(pn_terminus_t *pn);
 void qdr_terminus_free(qdr_terminus_t *terminus);
 
 /**
+ * qdr_terminus_format
+ *
+ * Write a human-readable representation of the terminus content to the string
+ * in 'output'.
+ *
+ * @param terminus The pointer returned by qdr_terminus()
+ * @param output The string buffer where the result shall be written
+ * @param size Input: the number of bytes availabie in output for writing.  Output: the
+ *             number of bytes remaining after the operation.
+ */
+void qdr_terminus_format(qdr_terminus_t *terminus, char *output, size_t *size);
+
+/**
  * qdr_terminus_copy
  *
  * Copy the contents of the qdr_terminus into a proton terminus
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 5a5900e..b1b253d 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -554,6 +554,16 @@
                     "description":"Number of deliveries whose delivery state was set to MODIFIED
by the router. These deliveries were modified but not processed.",
                     "graph": true
                 },
+                "deliveriesDelayed1Sec": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The total number of settled deliveries that were held
in the router for 1 to 10 seconds."
+                },
+                "deliveriesDelayed10Sec": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The total number of settled deliveries that were held
in the router for more than 10 seconds."
+                },
                 "deliveriesIngress": {
                     "type": "integer",
                     "description":"Number of deliveries that were sent to it by a sender
that is directly attached to the router.",
@@ -1442,6 +1452,21 @@
                     "graph": true,
                     "description": "The total number of modified deliveries."
                 },
+                "deliveriesDelayed1Sec": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The total number of settled deliveries that were held
in the router for 1 to 10 seconds."
+                },
+                "deliveriesDelayed10Sec": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The total number of settled deliveries that were held
in the router for more than 10 seconds."
+                },
+                "settleRate": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The average rate (over five seconds) of settlement in
deliveries-per-second.  This is included for egress links only."
+                },
                 "ingressHistogram": {
                     "type": "list",
                     "description": "For outgoing links on connections with 'normal' role.
 This histogram shows the number of settled deliveries on the link that ingressed the network
at each interior router node."
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 9600aca..4ed4055 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -42,8 +42,11 @@
 #define QDR_LINK_REJECTED_COUNT           18
 #define QDR_LINK_RELEASED_COUNT           19
 #define QDR_LINK_MODIFIED_COUNT           20
-#define QDR_LINK_INGRESS_HISTOGRAM        21
-#define QDR_LINK_PRIORITY                 22
+#define QDR_LINK_DELAYED_1SEC             21
+#define QDR_LINK_DELAYED_10SEC            22
+#define QDR_LINK_INGRESS_HISTOGRAM        23
+#define QDR_LINK_PRIORITY                 24
+#define QDR_LINK_SETTLE_RATE              25
 
 const char *qdr_link_columns[] =
     {"name",
@@ -67,8 +70,11 @@ const char *qdr_link_columns[] =
      "rejectedCount",
      "releasedCount",
      "modifiedCount",
+     "deliveriesDelayed1Sec",
+     "deliveriesDelayed10Sec",
      "ingressHistogram",
      "priority",
+     "settleRate",
      0};
 
 static const char *qd_link_type_name(qd_link_type_t lt)
@@ -89,7 +95,7 @@ static const char *address_key(qdr_address_t *addr)
     return addr && addr->hash_handle ? (const char*) qd_hash_key_by_handle(addr->hash_handle)
: NULL;
 }
 
-static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_link_t *link)
+static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *body, int col,
qdr_link_t *link)
 {
     char *text = 0;
 
@@ -213,6 +219,14 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int
col, qdr_li
         qd_compose_insert_ulong(body, link->modified_deliveries);
         break;
 
+    case QDR_LINK_DELAYED_1SEC:
+        qd_compose_insert_ulong(body, link->deliveries_delayed_1sec);
+        break;
+
+    case QDR_LINK_DELAYED_10SEC:
+        qd_compose_insert_ulong(body, link->deliveries_delayed_10sec);
+        break;
+
     case QDR_LINK_INGRESS_HISTOGRAM:
         if (link->ingress_histogram) {
             qd_compose_start_list(body);
@@ -227,20 +241,39 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int
col, qdr_li
         qd_compose_insert_uint(body, link->priority);
         break;
 
+    case QDR_LINK_SETTLE_RATE: {
+        uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+        if (delta_time > 0) {
+            if (delta_time > QDR_LINK_RATE_DEPTH)
+                delta_time = QDR_LINK_RATE_DEPTH;
+            for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++) {
+                link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH;
+                link->settled_deliveries[link->rate_cursor] = 0;
+            }
+            link->core_ticks = core->uptime_ticks;
+        }
+
+        uint64_t total = 0;
+        for (uint8_t i = 0; i < QDR_LINK_RATE_DEPTH; i++)
+            total += link->settled_deliveries[i];
+        qd_compose_insert_uint(body, total / QDR_LINK_RATE_DEPTH);
+    }
+        break;
+
     default:
         qd_compose_insert_null(body);
         break;
     }
 }
 
-static void qdr_agent_write_link_CT(qdr_query_t *query,  qdr_link_t *link)
+static void qdr_agent_write_link_CT(qdr_core_t *core, qdr_query_t *query,  qdr_link_t *link)
 {
     qd_composed_field_t *body = query->body;
 
     qd_compose_start_list(body);
     int i = 0;
     while (query->columns[i] >= 0) {
-        qdr_agent_write_column_CT(body, query->columns[i], link);
+        qdr_agent_write_column_CT(core, body, query->columns[i], link);
         i++;
     }
     qd_compose_end_list(body);
@@ -285,7 +318,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int
offset)
     //
     // Write the columns of the link into the response body.
     //
-    qdr_agent_write_link_CT(query, link);
+    qdr_agent_write_link_CT(core, query, link);
 
     //
     // Advance to the next address
@@ -314,7 +347,7 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query)
         //
         // Write the columns of the link entity into the response body.
         //
-        qdr_agent_write_link_CT(query, link);
+        qdr_agent_write_link_CT(core, query, link);
 
         //
         // Advance to the next link
@@ -330,13 +363,13 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query)
 }
 
 
-static void qdr_manage_write_response_map_CT(qd_composed_field_t *body, qdr_link_t *link)
+static void qdr_manage_write_response_map_CT(qdr_core_t *core, qd_composed_field_t *body,
qdr_link_t *link)
 {
     qd_compose_start_map(body);
 
     for(int i = 0; i < QDR_LINK_COLUMN_COUNT; i++) {
         qd_compose_insert_string(body, qdr_link_columns[i]);
-        qdr_agent_write_column_CT(body, i, link);
+        qdr_agent_write_column_CT(core, body, i, link);
     }
 
     qd_compose_end_map(body);
@@ -386,7 +419,7 @@ static void qdra_link_update_set_status(qdr_core_t *core, qdr_query_t
*query, qd
 {
     if (link) {
         //link->admin_state = qd_iterator_copy(adm_state);
-        qdr_manage_write_response_map_CT(query->body, link);
+        qdr_manage_write_response_map_CT(core, query->body, link);
         query->status = QD_AMQP_OK;
     }
     else {
diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h
index cd92c1b..80fa8a4 100644
--- a/src/router_core/agent_link.h
+++ b/src/router_core/agent_link.h
@@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t          *core,
                          qdr_query_t         *query,
                          qd_parsed_field_t   *in_body);
 
-#define QDR_LINK_COLUMN_COUNT  23
+#define QDR_LINK_COLUMN_COUNT  26
 
 const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1];
 
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index f9303c8..d7ea523 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -43,11 +43,13 @@
 #define QDR_ROUTER_REJECTED_DELIVERIES                 16
 #define QDR_ROUTER_RELEASED_DELIVERIES                 17
 #define QDR_ROUTER_MODIFIED_DELIVERIES                 18
-#define QDR_ROUTER_DELIVERIES_INGRESS                  19
-#define QDR_ROUTER_DELIVERIES_EGRESS                   20
-#define QDR_ROUTER_DELIVERIES_TRANSIT                  21
-#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER  22
-#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER   23
+#define QDR_ROUTER_DELAYED_1SEC                        19
+#define QDR_ROUTER_DELAYED_10SEC                       20
+#define QDR_ROUTER_DELIVERIES_INGRESS                  21
+#define QDR_ROUTER_DELIVERIES_EGRESS                   22
+#define QDR_ROUTER_DELIVERIES_TRANSIT                  23
+#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER  24
+#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER   25
 
 
 const char *qdr_router_columns[] =
@@ -70,6 +72,8 @@ const char *qdr_router_columns[] =
      "rejectedDeliveries",
      "releasedDeliveries",
      "modifiedDeliveries",
+     "deliveriesDelayed1Sec",
+     "deliveriesDelayed10Sec",
      "deliveriesIngress",
      "deliveriesEgress",
      "deliveriesTransit",
@@ -174,6 +178,14 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int
col, qdr_co
         qd_compose_insert_ulong(body, core->modified_deliveries);
         break;
 
+    case QDR_ROUTER_DELAYED_1SEC:
+        qd_compose_insert_ulong(body, core->deliveries_delayed_1sec);
+        break;
+
+    case QDR_ROUTER_DELAYED_10SEC:
+        qd_compose_insert_ulong(body, core->deliveries_delayed_10sec);
+        break;
+
     case QDR_ROUTER_DELIVERIES_INGRESS:
         qd_compose_insert_ulong(body, core->deliveries_ingress);
         break;
diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h
index dffc0eb..6a6e35f 100644
--- a/src/router_core/agent_router.h
+++ b/src/router_core/agent_router.h
@@ -21,7 +21,7 @@
 
 #include "router_core_private.h"
 
-#define QDR_ROUTER_COLUMN_COUNT  24
+#define QDR_ROUTER_COLUMN_COUNT  26
 
 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1];
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 8498ff3..0c58da4 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -23,6 +23,7 @@
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 #include <strings.h>
+#include <inttypes.h>
 #include "router_core_private.h"
 #include "core_link_endpoint.h"
 
@@ -475,6 +476,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->credit_pending = conn->link_capacity;
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;
+    link->core_ticks     = conn->core->uptime_ticks;
     link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus);
 
     link->strip_annotations_in  = conn->strip_annotations_in;
@@ -816,7 +818,7 @@ static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t
*link)
 }
 
 
-static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
+static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link,
const char *log_text)
 {
     //
     // Remove the link from the master list of links
@@ -900,6 +902,17 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn,
qdr_li
     free(link->ingress_histogram);
     free(link->insert_prefix);
     free(link->strip_prefix);
+
+    //
+    // Log the link closure
+    //
+    qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] %s: del=%"PRIu64" presett=%"PRIu64"
psdrop=%"PRIu64
+           " acc=%"PRIu64" rej=%"PRIu64" rel=%"PRIu64" mod=%"PRIu64" delay1=%"PRIu64" delay10=%"PRIu64,
+           conn->identity, link->identity, log_text, link->total_deliveries, link->presettled_deliveries,
+           link->dropped_presettled_deliveries, link->accepted_deliveries, link->rejected_deliveries,
+           link->released_deliveries, link->modified_deliveries, link->deliveries_delayed_1sec,
+           link->deliveries_delayed_10sec);
+
     free_qdr_link_t(link);
 }
 
@@ -931,9 +944,10 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     qdr_generate_link_name("qdlink", link->name, QD_DISCRIMINATOR_SIZE + 8);
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;
-    link->insert_prefix = 0;
-    link->strip_prefix = 0;
-    link->attach_count = 1;
+    link->insert_prefix  = 0;
+    link->strip_prefix   = 0;
+    link->attach_count   = 1;
+    link->core_ticks     = core->uptime_ticks;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -1196,7 +1210,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t
*action, boo
 
     qdr_connection_t *conn = action->args.connection.conn;
 
-
     //
     // Deactivate routes associated with this connection
     //
@@ -1236,7 +1249,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t
*action, boo
         //
         // Clean up the link and all its associated state.
         //
-        qdr_link_cleanup_CT(core, conn, link); // link_cleanup disconnects and frees the
ref.
+        qdr_link_cleanup_CT(core, conn, link, "Link closed due to connection loss"); // link_cleanup
disconnects and frees the ref.
         link_ref = DEQ_HEAD(conn->links);
     }
 
@@ -1260,6 +1273,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t
*action, boo
 
     qdrc_event_conn_raise(core, QDRC_EVENT_CONN_CLOSED, conn);
 
+    qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Connection Closed", conn->identity);
+
     DEQ_REMOVE(core->open_connections, conn);
     qdr_connection_free(conn);
 }
@@ -1375,6 +1390,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
         qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true);
         qdr_terminus_free(source);
         qdr_terminus_free(target);
+        qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Router attach forbidden on non-inter-router
connection", conn->identity);
         return;
     }
 
@@ -1388,6 +1404,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
         qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true);
         qdr_terminus_free(source);
         qdr_terminus_free(target);
+        qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach forbidden on inter-router
connection", conn->identity);
         return;
     }
 
@@ -1412,6 +1429,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
                     qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION,
true);
                     qdr_terminus_free(source);
                     qdr_terminus_free(target);
+                    qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach failed
- no address lookup handler", conn->identity);
                     return;
                 }
             }
@@ -1439,6 +1457,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
                 qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION,
true);
                 qdr_terminus_free(source);
                 qdr_terminus_free(target);
+                    qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach failed
- no address lookup handler", conn->identity);
                 return;
             }
             break;
@@ -1460,6 +1479,17 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
             break;
         }
     }
+
+    char   source_str[1000];
+    char   target_str[1000];
+    size_t source_len = 1000;
+    size_t target_len = 1000;
+
+    qdr_terminus_format(source, source_str, &source_len);
+    qdr_terminus_format(target, target_str, &target_len);
+
+    qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] Link attached: dir=%s source=%s
target=%s",
+           conn->identity, link->identity, dir == QD_INCOMING ? "in" : "out", source_str,
target_str);
 }
 
 
@@ -1615,9 +1645,8 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
             //
             // If the link is completely detached, release its resources
             //
-            if (link->detach_send_done) {
-                qdr_link_cleanup_CT(core, conn, link);
-            }
+            if (link->detach_send_done)
+                qdr_link_cleanup_CT(core, conn, link, "Link detached");
 
             return;
         }
@@ -1709,11 +1738,11 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
             qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED);
         } else {
             // no detach can be sent out because the connection was lost
-            qdr_link_cleanup_CT(core, conn, link);
+            qdr_link_cleanup_CT(core, conn, link, "Link lost");
         }
     } else if (link->detach_send_done) {  // detach count indicates detach has been scheduled
         // I/O thread is finished sending detach, ok to free link now
-        qdr_link_cleanup_CT(core, conn, link);
+        qdr_link_cleanup_CT(core, conn, link, "Link detached");
     }
 
     //
@@ -1741,7 +1770,7 @@ static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action,
bool
         link->detach_send_done = true;
         if (link->conn && link->detach_received) {
             // link is fully detached
-            qdr_link_cleanup_CT(core, link->conn, link);
+            qdr_link_cleanup_CT(core, link->conn, link, "Link detached");
         }
     }
 }
diff --git a/src/router_core/core_timer.c b/src/router_core/core_timer.c
index a20cd64..d34beb1 100644
--- a/src/router_core/core_timer.c
+++ b/src/router_core/core_timer.c
@@ -109,6 +109,8 @@ void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool
discard)
     if (discard)
         return;
 
+    core->uptime_ticks++;
+
     qdr_core_timer_t *timer = DEQ_HEAD(core->scheduled_timers);
     qdr_core_timer_t *timer_next = 0;
 
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 9d2437c..1d6a208 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -125,6 +125,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t
*in
     out_dlv->tag_length = 8;
     out_dlv->error      = 0;
 
+    out_dlv->ingress_time  = in_dlv ? in_dlv->ingress_time  : core->uptime_ticks;
     out_dlv->ingress_index = in_dlv ? in_dlv->ingress_index : -1;
 
     //
@@ -185,6 +186,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t
*link
                 free_qdr_link_work_t(dlv->link_work);
                 dlv->link_work = 0;
             }
+            dlv->disposition = PN_RELEASED;
             qdr_delivery_decref_CT(core, dlv, "qdr_forward_drop_presettled_CT_LH - remove
from link-work list");
 
             // Increment the presettled_dropped_deliveries on the out_link
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 5b5fbec..a66a703 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -55,7 +55,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char
*area,
     //
     // Set up the logging sources for the router core
     //
-    core->log       = qd_log_source("ROUTER_CORE");
+    core->log       = qd->router->log_source;
     core->agent_log = qd_log_source("AGENT");
 
     //
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index ba5f790..eb58837 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -393,6 +393,7 @@ struct qdr_delivery_t {
     qd_iterator_t          *to_addr;
     qd_iterator_t          *origin;
     uint64_t                disposition;
+    uint32_t                ingress_time;
     pn_data_t              *extension_state;
     qdr_error_t            *error;
     bool                    settled;
@@ -431,6 +432,8 @@ typedef enum {
     QDR_LINK_OPER_IDLE
 } qdr_link_oper_status_t;
 
+#define QDR_LINK_RATE_DEPTH 5
+
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
     qdr_core_t              *core;
@@ -480,8 +483,13 @@ struct qdr_link_t {
     uint64_t  rejected_deliveries;
     uint64_t  released_deliveries;
     uint64_t  modified_deliveries;
+    uint64_t  deliveries_delayed_1sec;
+    uint64_t  deliveries_delayed_10sec;
+    uint64_t  settled_deliveries[QDR_LINK_RATE_DEPTH];
     uint64_t *ingress_histogram;
     uint8_t   priority;
+    uint8_t   rate_cursor;
+    uint32_t  core_ticks;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -762,6 +770,7 @@ struct qdr_core_t {
     qdr_core_timer_list_t    scheduled_timers;
     qdr_general_work_list_t  work_list;
     qd_timer_t              *work_timer;
+    uint32_t                 uptime_ticks;
 
     qdr_connection_list_t open_connections;
     qdr_connection_t     *active_edge_connection;
@@ -847,19 +856,19 @@ struct qdr_core_t {
     qdr_delivery_cleanup_list_t  delivery_cleanup_list;  ///< List of delivery cleanup
items to be processed in an IO thread
 
     // Overall delivery counters
-    uint64_t           presettled_deliveries;
-    uint64_t           dropped_presettled_deliveries;
-    uint64_t           accepted_deliveries;
-    uint64_t           rejected_deliveries;
-    uint64_t           released_deliveries;
-    uint64_t           modified_deliveries;
-    uint64_t           deliveries_ingress;
-    uint64_t           deliveries_egress;
-    uint64_t           deliveries_transit;
-    uint64_t           deliveries_egress_route_container;
-    uint64_t           deliveries_ingress_route_container;
-
-
+    uint64_t  presettled_deliveries;
+    uint64_t  dropped_presettled_deliveries;
+    uint64_t  accepted_deliveries;
+    uint64_t  rejected_deliveries;
+    uint64_t  released_deliveries;
+    uint64_t  modified_deliveries;
+    uint64_t  deliveries_ingress;
+    uint64_t  deliveries_egress;
+    uint64_t  deliveries_transit;
+    uint64_t  deliveries_egress_route_container;
+    uint64_t  deliveries_ingress_route_container;
+    uint64_t  deliveries_delayed_1sec;
+    uint64_t  deliveries_delayed_10sec;
 };
 
 struct qdr_terminus_t {
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 58ec9e5..b88befc 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -20,6 +20,7 @@
 #include "router_core_private.h"
 #include <strings.h>
 #include <stdio.h>
+#include <inttypes.h>
 
 ALLOC_DEFINE(qdr_terminus_t);
 
@@ -75,6 +76,127 @@ void qdr_terminus_free(qdr_terminus_t *term)
 }
 
 
+void qdr_terminus_format(qdr_terminus_t *term, char *output, size_t *size)
+{
+    size_t len = snprintf(output, *size, "{");
+
+    output += len;
+    *size  -= len;
+    len     = 0;
+    
+    do {
+        if (term == 0)
+            break;
+
+        if (term->coordinator) {
+            len = snprintf(output, *size, "<coordinator>");
+            break;
+        }
+
+        if (term->dynamic)
+            len = snprintf(output, *size, "<dynamic>");
+        else if (term->address && term->address->iterator) {
+            qd_iterator_reset_view(term->address->iterator, ITER_VIEW_ALL);
+            len = qd_iterator_ncopy(term->address->iterator, (unsigned char*) output,
*size);
+        } else if (term->address == 0)
+            len = snprintf(output, *size, "<none>");
+
+        output += len;
+        *size  -= len;
+
+        char *text = "";
+        switch (term->durability) {
+        case PN_NONDURABLE:                              break;
+        case PN_CONFIGURATION: text = " dur:config";     break;
+        case PN_DELIVERIES:    text = " dur:deliveries"; break;
+        }
+
+        len     = snprintf(output, *size, text);
+        output += len;
+        *size  -= len;
+
+        switch (term->expiry_policy) {
+        case PN_EXPIRE_WITH_LINK:       text = " expire:link";  break;
+        case PN_EXPIRE_WITH_SESSION:    text = " expire:sess";  break;
+        case PN_EXPIRE_WITH_CONNECTION: text = " expire:conn";  break;
+        case PN_EXPIRE_NEVER:           text = "";              break;
+        }
+
+        len     = snprintf(output, *size, text);
+        output += len;
+        *size  -= len;
+
+        switch (term->distribution_mode) {
+        case PN_DIST_MODE_UNSPECIFIED: text = "";             break;
+        case PN_DIST_MODE_COPY:        text = " dist:copy";   break;
+        case PN_DIST_MODE_MOVE:        text = " dist:move";   break;
+        }
+
+        len     = snprintf(output, *size, text);
+        output += len;
+        *size  -= len;
+
+        if (term->timeout > 0) {
+            len     = snprintf(output, *size, " timeout:%"PRIu32, term->timeout);
+            output += len;
+            *size  -= len;
+        }
+
+        if (term->capabilities && pn_data_size(term->capabilities) > 0)
{
+            len     = snprintf(output, *size, " caps:");
+            output += len;
+            *size  -= len;
+
+            len = *size;
+            pn_data_format(term->capabilities, output, &len);
+            output += len;
+            *size  -= len;
+        }
+
+        if (term->filter && pn_data_size(term->filter) > 0) {
+            len     = snprintf(output, *size, " flt:");
+            output += len;
+            *size  -= len;
+
+            len = *size;
+            pn_data_format(term->filter, output, &len);
+            output += len;
+            *size  -= len;
+        }
+
+        if (term->outcomes && pn_data_size(term->outcomes) > 0) {
+            len     = snprintf(output, *size, " outcomes:");
+            output += len;
+            *size  -= len;
+
+            len = *size;
+            pn_data_format(term->outcomes, output, &len);
+            output += len;
+            *size  -= len;
+        }
+
+        if (term->properties && pn_data_size(term->properties) > 0) {
+            len     = snprintf(output, *size, " props:");
+            output += len;
+            *size  -= len;
+
+            len = *size;
+            pn_data_format(term->properties, output, &len);
+            output += len;
+            *size  -= len;
+        }
+
+        len = 0;
+    } while (false);
+
+    output += len;
+    *size  -= len;
+    
+    len = snprintf(output, *size, "}");
+    *size -= len;
+}
+
+
 void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to)
 {
     if (!from) {
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 08b6cde..a9caffd 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -554,17 +554,22 @@ void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t
*delive
 {
     qdr_link_t *link = delivery->link;
     if (link) {
+        bool do_rate = false;
+
         if (delivery->presettled) {
+            do_rate = delivery->disposition != PN_RELEASED;
             link->presettled_deliveries++;
             if (link->link_direction ==  QD_INCOMING && link->link_type ==
QD_LINK_ENDPOINT)
                 core->presettled_deliveries++;
         }
         else if (delivery->disposition == PN_ACCEPTED) {
+            do_rate = true;
             link->accepted_deliveries++;
             if (link->link_direction ==  QD_INCOMING)
                 core->accepted_deliveries++;
         }
         else if (delivery->disposition == PN_REJECTED) {
+            do_rate = true;
             link->rejected_deliveries++;
             if (link->link_direction ==  QD_INCOMING)
                 core->rejected_deliveries++;
@@ -580,8 +585,36 @@ void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t
*delive
                 core->modified_deliveries++;
         }
 
+        uint32_t delay = core->uptime_ticks - delivery->ingress_time;
+        if (delay > 10) {
+            link->deliveries_delayed_10sec++;
+            if (link->link_direction ==  QD_INCOMING)
+                core->deliveries_delayed_10sec++;
+        } else if (delay > 1) {
+            link->deliveries_delayed_1sec++;
+            if (link->link_direction ==  QD_INCOMING)
+                core->deliveries_delayed_1sec++;
+        }
+
         if (qd_bitmask_valid_bit_value(delivery->ingress_index) && link->ingress_histogram)
             link->ingress_histogram[delivery->ingress_index]++;
+
+        //
+        // Compute the settlement rate
+        //
+        if (do_rate) {
+            uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+            if (delta_time > 0) {
+                if (delta_time > QDR_LINK_RATE_DEPTH)
+                    delta_time = QDR_LINK_RATE_DEPTH;
+                for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++)
{
+                    link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH;
+                    link->settled_deliveries[link->rate_cursor] = 0;
+                }
+                link->core_ticks = core->uptime_ticks;
+            }
+            link->settled_deliveries[link->rate_cursor]++;
+        }
     }
 }
 
@@ -1019,6 +1052,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action,
bool dis
     qdr_link_t     *link = dlv->link;
 
     //
+    // Record the ingress time so we can track the age of this delivery.
+    //
+    dlv->ingress_time = core->uptime_ticks;
+
+    //
     // If the link is an edge link, mark this delivery as via-edge
     //
     dlv->via_edge = link->edge;
diff --git a/src/router_node.c b/src/router_node.c
index 6e9cc25..e87adcd 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -292,7 +292,7 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link,
qd_messa
         const char *src = pn_terminus_get_address(pn_link_source(pn_link));
         const char *tgt = pn_terminus_get_address(pn_link_target(pn_link));
         qd_log(qd_message_log_source(), QD_LOG_TRACE,
-               "[%"PRIu64"]: %s %s on link '%s' (%s -> %s)",
+               "[C%"PRIu64"]: %s %s on link '%s' (%s -> %s)",
                qd_connection_connection_id(conn),
                pn_link_is_sender(pn_link) ? "Sent" : "Received",
                msg_str,
@@ -1145,15 +1145,18 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t
*conn, bool
     }
 
 
-    qdr_connection_info_t *connection_info = qdr_connection_info(tport && pn_transport_is_encrypted(tport),
-                                                                 tport && pn_transport_is_authenticated(tport),
+    bool encrypted     = tport && pn_transport_is_encrypted(tport);
+    bool authenticated = tport && pn_transport_is_authenticated(tport);
+
+    qdr_connection_info_t *connection_info = qdr_connection_info(encrypted,
+                                                                 authenticated,
                                                                  conn->opened,
-                                                                 (char *)mech,
+                                                                 (char*) mech,
                                                                  conn->connector ? QD_OUTGOING
: QD_INCOMING,
                                                                  host,
                                                                  proto,
                                                                  cipher,
-                                                                 (char *)user,
+                                                                 (char*) user,
                                                                  container,
                                                                  props,
                                                                  ssl_ssf,
@@ -1168,6 +1171,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t
*conn, bool
                           vhost,
                           connection_info,
                           bind_connection_context, conn);
+
+    char   props_str[1000];
+    size_t props_len = 1000;
+
+    pn_data_format(props, props_str, &props_len);
+
+    qd_log(router->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection Opened: dir=%s host=%s
vhost=%s encrypted=%s"
+           " auth=%s user=%s container_id=%s props=%s",
+           connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto
: "no",
+           authenticated ? mech : "no", (char*) user, container, props_str);
 }
 
 static int AMQP_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
diff --git a/src/server.c b/src/server.c
index 760126d..bcd4a82 100644
--- a/src/server.c
+++ b/src/server.c
@@ -692,7 +692,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
         pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);
         sys_mutex_unlock(ctx->server->lock);
 
-        qd_log(ctx->server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Accepted connection
to %s from %s",
+        qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Accepted connection
to %s from %s",
                ctx->connection_id, name, ctx->rhost_port);
     } else if (ctx->connector) { /* Establishing an outgoing connection */
         config = &ctx->connector->config;
@@ -969,14 +969,14 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t
*pn_co
                 qd_increment_conn_index(ctx);
                 const qd_server_config_t *config = &ctx->connector->config;
                 if (condition  && pn_condition_is_set(condition)) {
-                    qd_log(qd_server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Connection
to %s failed: %s %s", ctx->connection_id, config->host_port,
+                    qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection
to %s failed: %s %s", ctx->connection_id, config->host_port,
                            pn_condition_get_name(condition), pn_condition_get_description(condition));
                 } else {
-                    qd_log(qd_server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Connection
to %s failed", ctx->connection_id, config->host_port);
+                    qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection
to %s failed", ctx->connection_id, config->host_port);
                 }
             } else if (ctx && ctx->listener) { /* Incoming connection */
                 if (condition && pn_condition_is_set(condition)) {
-                    qd_log(ctx->server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Connection
from %s (to %s) failed: %s %s",
+                    qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection
from %s (to %s) failed: %s %s",
                            ctx->connection_id, ctx->rhost_port, ctx->listener->config.host_port,
pn_condition_get_name(condition),
                            pn_condition_get_description(condition));
                 }
diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py
index 9030bcd..742c6e0 100644
--- a/tests/system_tests_core_client.py
+++ b/tests/system_tests_core_client.py
@@ -236,7 +236,7 @@ class TestCallTimeout(TestService):
         def on_timer_task(self, event):
             log = self.service.qm.get_log()
             for e in log:
-                if (e[0] == 'ROUTER_CORE'and e[1] == 'error'
+                if (e[0] == 'ROUTER' and e[1] == 'error'
                         and e[2] == 'client test request done '
                                     'error=Timed out'):
                     # yes this is the line you're looking for:
diff --git a/tools/qdstat.in b/tools/qdstat.in
index b0846e0..51c2e20 100755
--- a/tools/qdstat.in
+++ b/tools/qdstat.in
@@ -321,6 +321,11 @@ class BusManager(Node):
             rows.append(('Rejected Count', router.rejectedDeliveries))
             rows.append(('Released Count', router.releasedDeliveries))
             rows.append(('Modified Count', router.modifiedDeliveries))
+            try:
+                rows.append(('Deliveries Delayed > 1sec', router.deliveriesDelayed1Sec))
+                rows.append(('Deliveries Delayed > 10sec', router.deliveriesDelayed10Sec))
+            except:
+                pass
             rows.append(('Ingress Count', router.deliveriesIngress))
             rows.append(('Egress Count', router.deliveriesEgress))
             rows.append(('Transit Count', router.deliveriesTransit))
@@ -351,12 +356,14 @@ class BusManager(Node):
         cols = ('linkType', 'linkDir', 'connectionId', 'identity', 'peer', 'owningAddr',
                 'capacity', 'undeliveredCount', 'unsettledCount', 'deliveryCount',
                 'presettledCount', 'droppedPresettledCount', 'acceptedCount', 'rejectedCount',
'releasedCount',
-                'modifiedCount', 'adminStatus', 'operStatus', 'linkName', 'priority')
+                'modifiedCount', 'deliveriesDelayed1Sec', 'deliveriesDelayed10Sec', 'adminStatus',
'operStatus',
+                'linkName', 'priority', 'settleRate')
 
         objects = self.query('org.apache.qpid.dispatch.router.link', cols, limit=self.opts.limit)
 
         has_dropped_presettled_count = False
         has_priority = False
+        has_delayed  = False
 
         if objects:
             first_row = objects[0]
@@ -365,15 +372,16 @@ class BusManager(Node):
                     has_dropped_presettled_count = True
                 if hasattr(first_row, 'priority'):
                     has_priority = True
+                if hasattr(first_row, 'deliveriesDelayed1Sec'):
+                    has_delayed = True
 
         if has_priority:
             heads.append(Header("pri"))
         heads.append(Header("undel"))
         heads.append(Header("unsett"))
-        heads.append(Header("del"))
+        heads.append(Header("deliv"))
         heads.append(Header("presett"))
 
-
         if has_dropped_presettled_count:
             heads.append(Header("psdrop"))
 
@@ -381,9 +389,12 @@ class BusManager(Node):
         heads.append(Header("rej"))
         heads.append(Header("rel"))
         heads.append(Header("mod"))
-        heads.append(Header("admin"))
-        heads.append(Header("oper"))
+        if has_delayed:
+            heads.append(Header("delay"))
+            heads.append(Header("rate"))
         if self.opts.verbose:
+            heads.append(Header("admin"))
+            heads.append(Header("oper"))
             heads.append(Header("name"))
 
         for link in objects:
@@ -409,9 +420,12 @@ class BusManager(Node):
             row.append(link.rejectedCount)
             row.append(link.releasedCount)
             row.append(link.modifiedCount)
-            row.append(link.adminStatus)
-            row.append(link.operStatus)
+            if has_delayed:
+                row.append(link.deliveriesDelayed1Sec + link.deliveriesDelayed10Sec)
+                row.append(link.settleRate)
             if self.opts.verbose:
+                row.append(link.adminStatus)
+                row.append(link.operStatus)
                 row.append(link.linkName)
             rows.append(row)
         title = "Router Links"
@@ -491,15 +505,16 @@ class BusManager(Node):
 
         if has_priority:
             heads.append(Header("pri"))
-        heads.append(Header("in-proc", Header.COMMAS))
+        if self.opts.verbose:
+            heads.append(Header("in-proc", Header.COMMAS))
         heads.append(Header("local", Header.COMMAS))
         heads.append(Header("remote", Header.COMMAS))
-        heads.append(Header("cntnr", Header.COMMAS))
         heads.append(Header("in", Header.COMMAS))
         heads.append(Header("out", Header.COMMAS))
         heads.append(Header("thru", Header.COMMAS))
-        heads.append(Header("to-proc", Header.COMMAS))
-        heads.append(Header("from-proc", Header.COMMAS))
+        if self.opts.verbose:
+            heads.append(Header("to-proc", Header.COMMAS))
+            heads.append(Header("from-proc", Header.COMMAS))
 
         for addr in objects:
             row = []
@@ -509,15 +524,16 @@ class BusManager(Node):
             row.append(addr.distribution)
             if has_priority:
                 row.append(addr.priority if addr.priority >= 0 else "")
-            row.append(addr.inProcess)
-            row.append(addr.subscriberCount)
+            if self.opts.verbose:
+                row.append(addr.inProcess)
+            row.append(addr.containerCount if addr.name[0] in 'CDEF' else addr.subscriberCount)
             row.append(addr.remoteCount)
-            row.append(addr.containerCount)
             row.append(addr.deliveriesIngress)
             row.append(addr.deliveriesEgress)
             row.append(addr.deliveriesTransit)
-            row.append(addr.deliveriesToContainer)
-            row.append(addr.deliveriesFromContainer)
+            if self.opts.verbose:
+                row.append(addr.deliveriesToContainer)
+                row.append(addr.deliveriesFromContainer)
             rows.append(row)
         title = "Router Addresses"
         sorter = Sorter(heads, rows, 'addr', 0, True)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message