celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [celix] 01/05: Adds interceptor support to the pubsub websocket
Date Mon, 26 Jul 2021 18:55:01 GMT
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 6bf1ef1e7e71b4fea6589f4639eb3b3b30e09de3
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
AuthorDate: Mon Jul 26 15:35:37 2021 +0200

    Adds interceptor support to the pubsub websocket
---
 .../v2/src/pubsub_tcp_topic_receiver.c             |  11 +-
 .../v2/src/pubsub_websocket_common.h               |   2 +-
 .../v2/src/pubsub_websocket_topic_receiver.c       | 211 ++++++++++-----------
 .../v2/src/pubsub_websocket_topic_sender.c         |  11 ++
 .../v2/src/pubsub_zmq_topic_receiver.c             |  11 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |   2 +-
 6 files changed, 126 insertions(+), 122 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 36816f1..e9ef6b4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -446,7 +446,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const
char* msg
                                                                              message->header.msgMinorVersion,
                                                                              &deSerializeBuffer,
0, msg);
                 if (status != CELIX_SUCCESS) {
-                    L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
msgFqn,
+                    L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
msgFqn,
                            receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
                     break;
                 }
@@ -484,17 +484,14 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t
*mes
         }
 
         if (status == CELIX_SUCCESS) {
-            uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgFqn, msgId,
-                                                                  deSerializedMsg, &metadata);
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgFqn, message->header.msgId, deSerializedMsg, &metadata);
             if (cont) {
                 bool release;
                 callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release,
metadata);
-                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgFqn, msgId, deSerializedMsg, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgFqn, message->header.msgId, deSerializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
message->header.msgId,
-                                                                 deSerializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
message->header.msgId, deSerializedMsg);
                 }
             }
         } else {
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
index 4c22319..8a764d1 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h
@@ -27,7 +27,7 @@
 
 
 struct pubsub_websocket_msg_header {
-    const char *id; //FQN
+    const char *fqn;
     uint8_t major;
     uint8_t minor;
     uint32_t seqNr;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index ea997e9..56f4008 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -37,12 +37,14 @@
 #include <jansson.h>
 #include <pubsub_utils.h>
 #include <celix_api.h>
+#include "pubsub_interceptors_handler.h"
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN 37
 #endif
 
-
+#define L_TRACE(...) \
+    celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
 #define L_DEBUG(...) \
     celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
@@ -72,6 +74,7 @@ struct pubsub_websocket_topic_receiver {
     char *uri;
 
     pubsub_serializer_handler_t* serializerHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     celix_websocket_service_t sockSvc;
     long svcId;
@@ -93,7 +96,7 @@ struct pubsub_websocket_topic_receiver {
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
-        hash_map_t *map; //key = bnd id, value = psa_websocket_subscriber_entry_t
+        hash_map_t *map; //key = long svc id, value = psa_websocket_subscriber_entry_t
         bool allInitialized;
     } subscribers;
 };
@@ -111,13 +114,13 @@ typedef struct psa_websocket_requested_connection_entry {
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
-    hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
+    pubsub_subscriber_t* subscriberSvc;
     bool initialized; //true if the init function is called through the receive thread
 } psa_websocket_subscriber_entry_t;
 
 
-static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t
*props, const celix_bundle_t *owner);
-static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const
celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t
*props);
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const
celix_properties_t *props);
 static void* psa_websocket_recvThread(void * data);
 static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
*receiver);
 static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver);
@@ -138,6 +141,8 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
     receiver->serializerHandler = serializerHandler;
+    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic,
PUBSUB_WEBSOCKET_ADMIN_TYPE,
+                                                                     pubsub_serializerHandler_getSerializationType(serializerHandler));
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->admin = admin;
@@ -166,8 +171,8 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = receiver;
-        opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber;
+        opts.addWithProperties = pubsub_websocketTopicReceiver_addSubscriber;
+        opts.removeWithProperties = pubsub_websocketTopicReceiver_removeSubscriber;
 
         receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx,
&opts);
     }
@@ -262,22 +267,13 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t
*re
         celix_bundleContext_unregisterService(receiver->ctx, receiver->svcId);
 
         celixThreadMutex_lock(&receiver->subscribers.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (entry != NULL)  {
-                hashMap_destroy(entry->subscriberServices, false, false);
-                free(entry);
-            }
-
-        }
-        hashMap_destroy(receiver->subscribers.map, false, false);
+        hashMap_destroy(receiver->subscribers.map, false, true);
 
 
         celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
         celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (entry != NULL) {
@@ -307,6 +303,8 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t
*re
         }
         celix_arrayList_destroy(receiver->recvBuffer.list);
 
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
+
         free(receiver->uri);
         free(receiver->scope);
         free(receiver->topic);
@@ -394,10 +392,9 @@ void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receive
     free(key);
 }
 
-static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t
*props, const celix_bundle_t *bnd) {
+static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t
*props) {
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
     const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (receiver->scope == NULL){
@@ -414,96 +411,113 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle,
void *svc,
         return;
     }
 
+    psa_websocket_subscriber_entry_t* entry = calloc(1, sizeof(*entry));
+    entry->subscriberSvc = svc;
+    entry->initialized = false;
+
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry == NULL) {
-        //new create entry
-        entry = calloc(1, sizeof(*entry));
-        entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
-        entry->initialized = false;
-        hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
-    }
-    hashMap_put(entry->subscriberServices, (void*)svcId, svc);
+    hashMap_put(receiver->subscribers.map, (void*)svcId, entry);
+    receiver->subscribers.allInitialized = false;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)),
const celix_properties_t *props __attribute__((unused)), const celix_bundle_t *bnd) {
+static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)),
const celix_properties_t *props) {
     pubsub_websocket_topic_receiver_t *receiver = handle;
 
-    long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_websocket_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map,
(void*)svcId);
+    free(entry);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
 
+static void callReceivers(
+        pubsub_websocket_topic_receiver_t *receiver,
+        uint32_t msgId,
+        const pubsub_websocket_msg_header_t* header,
+        const char *payload,
+        size_t payloadSize,
+        void** msg,
+        bool* release,
+        const celix_properties_t* metadata) {
+    *release = true;
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId);
-    if (entry != NULL) {
-        hashMap_remove(entry->subscriberServices, (void*)svcId);
-    }
-    if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
-        //remove entry
-        hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        hashMap_destroy(entry->subscriberServices, false, false);
-        free(entry);
+    hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_websocket_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->subscriberSvc->receive != NULL) {
+            entry->subscriberSvc->receive(entry->subscriberSvc->handle, header->fqn,
msgId, *msg, metadata, release);
+            if (!(*release)) {
+                //receive function has taken ownership, deserialize again for new message
+                struct iovec deSerializeBuffer;
+                deSerializeBuffer.iov_base = (void*) payload;
+                deSerializeBuffer.iov_len = payloadSize;
+                celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
+                                                                             msgId,
+                                                                             header->major,
+                                                                             header->minor,
+                                                                             &deSerializeBuffer,
0, msg);
+                if (status != CELIX_SUCCESS) {
+                    L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", header->fqn,
+                           receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                    break;
+                }
+            }
+            *release = true;
+        }
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver,
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload,
size_t payloadSize) {
-    //NOTE receiver->subscribers.mutex locked
-
-    uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id);
-
+static void processJsonMsg(pubsub_websocket_topic_receiver_t *receiver, const pubsub_websocket_msg_header_t*
header, const char *payload, size_t payloadSize) {
+    uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, header->fqn);
     if (msgId == 0) {
-        L_WARN("Cannot find msg id for msg fqn %s", hdr->id);
+        L_WARN("Cannot find msg id for msg fqn %s", header->fqn);
         return;
     }
 
-    void *deSerializedMsg = NULL;
-    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler,
msgId, hdr->major, hdr->minor);
+    void *deserializedMsg = NULL;
+    bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler,
msgId, header->major, header->minor);
     if (validVersion) {
         struct iovec deSerializeBuffer;
-        deSerializeBuffer.iov_base = (void *)payload;
-        deSerializeBuffer.iov_len  = payloadSize;
-        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
+        deSerializeBuffer.iov_base = (void*)payload;
+        deSerializeBuffer.iov_len = payloadSize;
+        celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
msgId,
+                                                                     header->major,
+                                                                     header->minor,
+                                                                     &deSerializeBuffer,
0, &deserializedMsg);
         if (status == CELIX_SUCCESS) {
-            hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
-            bool release = true;
-            while (hashMapIterator_hasNext(&iter)) {
-                pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL,
&release);
-                if (!release && hashMapIterator_hasNext(&iter)) {
-                    //receive function has taken ownership and still more receive function
to come ..
-                    //deserialize again for new message
-                    status = pubsub_serializerHandler_deserialize(receiver->serializerHandler,
msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
-                    if (status != CELIX_SUCCESS) {
-                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                        break;
-                    }
-                    release = true;
+            celix_properties_t *metadata = NULL; //NOTE metadata not supported for websocket
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
header->fqn, msgId,
+                                                                  deserializedMsg, &metadata);
+            if (cont) {
+                bool release;
+                callReceivers(receiver, msgId, header, payload, payloadSize, &deserializedMsg,
&release, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
header->fqn, msgId, deserializedMsg, metadata);
+                if (release) {
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
msgId, deserializedMsg);
                 }
             }
-            if (release) {
-                pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
msgId, deSerializedMsg);
-            }
         } else {
-            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
header->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
         }
     } else {
-        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch.
Version received: %i.%i.x, version send: %i.%i.x",
-               hdr->id,
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch.
Version received: %i.%i.x, version local: %i.%i.x",
+               header->fqn,
                pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
-               (int)hdr->major,
-               (int)hdr->minor,
+               (int) header->major,
+               (int) header->minor,
                pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler,
msgId),
                pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler,
msgId));
     }
 }
 
-static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg,
size_t msgSize) {
+static void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t
msgSize) {
     json_error_t error;
     json_t *jsMsg = json_loadb(msg, msgSize, 0, &error);
-    if(jsMsg != NULL) {
-        json_t *jsId = json_object_get(jsMsg, "id");
+    if (jsMsg != NULL) {
+        json_t *jsId = json_object_get(jsMsg, "id"); //NOTE called id, but is the msgFqn
         json_t *jsMajor = json_object_get(jsMsg, "major");
         json_t *jsMinor = json_object_get(jsMsg, "minor");
         json_t *jsSeqNr = json_object_get(jsMsg, "seqNr");
@@ -511,24 +525,15 @@ static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver,
const
 
         if (jsId && jsMajor && jsMinor && jsSeqNr && jsData)
{
             pubsub_websocket_msg_header_t hdr;
-            hdr.id = json_string_value(jsId);
+            hdr.fqn = json_string_value(jsId);
             hdr.major = (uint8_t) json_integer_value(jsMajor);
             hdr.minor = (uint8_t) json_integer_value(jsMinor);
             hdr.seqNr = (uint32_t) json_integer_value(jsSeqNr);
-            const char *payload = json_dumps(jsData, 0);
+            char *payload = json_dumps(jsData, 0);
             size_t payloadSize = strlen(payload);
-            printf("Received msg: id %s\tmajor %u\tminor %u\tseqNr %u\tdata %s\n", hdr.id,
hdr.major, hdr.minor, hdr.seqNr, payload);
-
-            celixThreadMutex_lock(&receiver->subscribers.mutex);
-            hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-                if (entry != NULL) {
-                    processMsgForSubscriberEntry(receiver, entry, &hdr, payload, payloadSize);
-                }
-            }
-            celixThreadMutex_unlock(&receiver->subscribers.mutex);
-            free((void *) payload);
+            L_TRACE("Received msg: fqn %s\tmajor %u\tminor %u\tseqNr %u\tdata %s\n", hdr.fqn,
hdr.major, hdr.minor, hdr.seqNr, payload);
+            processJsonMsg(receiver, &hdr, payload, payloadSize);
+            free(payload);
         } else {
             L_WARN("[PSA_WEBSOCKET_TR] Received unsupported message: "
                    "ID = %s, major = %d, minor = %d, seqNr = %d, data valid? %s",
@@ -539,9 +544,7 @@ static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver,
const
         json_decref(jsMsg);
     } else {
         L_WARN("[PSA_WEBSOCKET_TR] Failed to load websocket JSON message, error line: %d,
error message: %s", error.line, error.text);
-        return;
     }
-
 }
 
 static void* psa_websocket_recvThread(void * data) {
@@ -728,20 +731,16 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->initialized) {
-                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                    int rc = 0;
-                    if (svc != NULL && svc->init != NULL) {
-                        rc = svc->init(svc->handle);
-                    }
-                    if (rc == 0) {
-                        //note now only initialized on first subscriber entries added.
-                        entry->initialized = true;
-                    } else {
-                        L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-                        allInitialized = false;
-                    }
+                int rc = 0;
+                if (entry->subscriberSvc != NULL && entry->subscriberSvc->init
!= NULL) {
+                    rc = entry->subscriberSvc->init(entry->subscriberSvc->handle);
+                }
+                if (rc == 0) {
+                    //note now only initialized on first subscriber entries added.
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
                 }
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index adc5ffe..e435093 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -33,6 +33,7 @@
 #include "http_admin/api.h"
 #include "civetweb.h"
 #include "pubsub_websocket_admin.h"
+#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 
@@ -56,6 +57,7 @@ struct pubsub_websocket_topic_sender {
     char *uri;
 
     pubsub_serializer_handler_t* serializerHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     int seqNr; //atomic
 
@@ -102,6 +104,7 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create(
     sender->ctx = ctx;
     sender->logHelper = logHelper;
     sender->serializerHandler = serializerHandler;
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic,
PUBSUB_WEBSOCKET_ADMIN_TYPE, pubsub_serializerHandler_getSerializationType(serializerHandler));
 
     psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
     sender->uri = psa_websocket_createURI(scope, topic);
@@ -177,6 +180,7 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t
*sender
         if (sender->scope != NULL) {
             free(sender->scope);
         }
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         free(sender->topic);
         free(sender->uri);
         free(sender);
@@ -263,6 +267,11 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned
int msgType
         return status;
     }
 
+    bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn,
msgTypeId, inMsg, &metadata);
+    if (!cont) {
+        L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        return status;
+    }
 
     if (sender->sockConnection != NULL) {
         delay_first_send_for_late_joiners(sender);
@@ -306,6 +315,8 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int
msgType
     	status = CELIX_SUCCESS; // Not an error, just nothing to do
     }
 
+    pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId,
inMsg, metadata);
+
     return status;
 }
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index d6c5805..f5e70b0 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -64,13 +64,13 @@
 struct pubsub_zmq_topic_receiver {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
     char *scope;
     char *topic;
 
+    pubsub_serializer_handler_t* serializerHandler;
     pubsub_interceptors_handler_t *interceptorsHandler;
 
     void *zmqCtx;
@@ -466,17 +466,14 @@ static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver,
pubsub_prot
                                                                      message->header.msgMinorVersion,
                                                                      &deSerializeBuffer,
0, &deserializedMsg);
         if (status == CELIX_SUCCESS) {
-            uint32_t msgId = message->header.msgId;
             celix_properties_t *metadata = message->metadata.metadata;
-            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgFqn, msgId,
-                                                                  deserializedMsg, &metadata);
+            bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgFqn, message->header.msgId, deserializedMsg, &metadata);
             if (cont) {
                 bool release;
                 callReceivers(receiver, msgFqn, message, &deserializedMsg, &release,
metadata);
-                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgFqn, msgId, deserializedMsg, metadata);
+                pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgFqn, message->header.msgId, deserializedMsg, metadata);
                 if (release) {
-                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
message->header.msgId,
-                                                                 deserializedMsg);
+                    pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler,
message->header.msgId, deserializedMsg);
                 }
             }
         } else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index d1f36a5..155cb19 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -52,13 +52,13 @@
 struct pubsub_zmq_topic_sender {
     celix_bundle_context_t *ctx;
     celix_log_helper_t *logHelper;
-    pubsub_serializer_handler_t* serializerHandler;
     void *admin;
     long protocolSvcId;
     pubsub_protocol_service_t *protocol;
     uuid_t fwUUID;
     bool zeroCopyEnabled;
 
+    pubsub_serializer_handler_t* serializerHandler;
     pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;

Mime
View raw message