celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 06/11: Add interceptor
Date Tue, 06 Oct 2020 19:04:50 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 050c3962ec60fbb797bc2265656fb7bab8c85537
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Mon Jun 29 22:15:45 2020 +0200

    Add interceptor
---
 .../src/pubsub_tcp_topic_receiver.c                | 71 ++++++++++++++--------
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  8 ++-
 2 files changed, 51 insertions(+), 28 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 71890d9..a795f92 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -38,6 +38,7 @@
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
 
 #define MAX_EPOLL_EVENTS     16
@@ -67,6 +68,7 @@ struct pubsub_tcp_topic_receiver {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     struct {
         celix_thread_t thread;
@@ -164,6 +166,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     const char *staticServerEndPointUrls = NULL;
     const char *staticConnectUrls = NULL;
 
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
     staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR,
topic, scope);
 
     if (topicProperties != NULL) {
@@ -344,6 +347,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver)
{
             receiver->socketHandler = NULL;
         }
 
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -553,31 +557,48 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver,
psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function
to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle,
+                                     msgSer->msgName,
+                                     msgSer->msgId,
+                                     deSerializedMsg,
+                                     message->metadata.metadata,
+                                     &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
+                                                                   msgType,
+                                                                   msgId,
+                                                                   deSerializedMsg,
+                                                                   metadata);
+                        if (!release && hashMapIterator_hasNext(&iter)) {
+                            //receive function has taken ownership and still more receive
function to come ..
+                            //deserialize again for new message
+                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
1, &deSerializedMsg);
+                            if (status != CELIX_SUCCESS) {
+                                L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic
%s/%s",
+                                       msgSer->msgName,
+                                       receiver->scope == NULL ? "(null)" : receiver->scope,
+                                       receiver->topic);
+                                break;
+                            }
+                            release = true;
                         }
-                        release = true;
                     }
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
+                    if (message->metadata.metadata) {
+                        celix_properties_destroy(message->metadata.metadata);
+                    }
+                    updateReceiveCount += 1;
                 }
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
-                }
-                if (message->metadata.metadata) {
-                    celix_properties_destroy(message->metadata.metadata);
-                }
-                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
msgSer->msgName,
@@ -644,10 +665,7 @@ static void *psa_tcp_recvThread(void *data) {
 
 pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t
*receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope,
-             PUBSUB_AMDIN_METRICS_NAME_MAX,
-             "%s",
-             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope ==
NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
@@ -671,8 +689,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
         hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
         while (hashMapIterator_hasNext(&iter2)) {
             hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-            result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins),
-                                                 sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
             result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
             int k = 0;
             hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 29dc50e..754e769 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -35,6 +35,7 @@
 #include "celix_constants.h"
 #include <signal.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS              2
 #define TCP_BIND_MAX_RETRY                      10
@@ -59,6 +60,7 @@ struct pubsub_tcp_topic_sender {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;
     char *topic;
@@ -144,6 +146,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     }
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
                                                                    PSA_TCP_DEFAULT_METRICS_ENABLED);
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
     bool isEndpoint = false;
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
@@ -312,6 +315,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender)
{
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL))
{
             pubsub_tcpHandler_destroy(sender->socketHandler);
             sender->socketHandler = NULL;
@@ -533,7 +537,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const
void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler,
entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+        if (status == CELIX_SUCCESS /*ser ok*/ && cont) {
             pubsub_protocol_message_t message;
             message.metadata.metadata = NULL;
             message.payload.payload = NULL;
@@ -561,6 +566,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const
void *i
                     status = -1;
                     sendOk = false;
                 }
+                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName,
msgTypeId, inMsg, metadata);
                 if (message.metadata.metadata)
                     celix_properties_destroy(message.metadata.metadata);
                 if (serializedIoVecOutput) {


Mime
View raw message