celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [celix] 03/05: Removes interceptor support from psa zmq/tcp v1 (the behavior is different from v2)
Date Mon, 26 Jul 2021 18:55:03 GMT
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 3f0014b778b9abb3a2df04e9eca50eee5eca0fb0
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
AuthorDate: Mon Jul 26 19:04:19 2021 +0200

    Removes interceptor support from psa zmq/tcp v1 (the behavior is different from v2)
---
 .../v1/src/pubsub_tcp_topic_receiver.c             |  54 ++---
 .../v1/src/pubsub_tcp_topic_sender.c               |  87 ++++----
 .../v1/src/pubsub_zmq_topic_receiver.c             |  45 ++--
 .../v1/src/pubsub_zmq_topic_sender.c               | 232 ++++++++++-----------
 4 files changed, 182 insertions(+), 236 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
index 4178b50..aeda1c3 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c
@@ -34,7 +34,6 @@
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
-#include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
 
 #ifndef UUID_STR_LEN
@@ -64,7 +63,6 @@ struct pubsub_tcp_topic_receiver {
     bool isPassive;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
-    pubsub_interceptors_handler_t *interceptorsHandler;
 
     struct {
         celix_thread_t thread;
@@ -144,7 +142,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic,
PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR,
topic, scope);
     const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED,
topic, scope);
     const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY,
topic, scope);
@@ -322,7 +319,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver)
{
             pubsub_tcpHandler_destroy(receiver->socketHandler);
             receiver->socketHandler = NULL;
         }
-        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -549,39 +545,31 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver,
psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                const char *msgType = msgSer->msgName;
-                uint32_t msgId = message->header.msgId;
-                celix_properties_t *metadata = message->metadata.metadata;
-                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                if (cont) {
-                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
-                    while (hashMapIterator_hasNext(&iter)) {
-                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata, &release);
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, metadata);
-                        if (!release) {
-                            //receive function has taken ownership, deserialize again for
new message
-                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
1, &deSerializedMsg);
-                            if (status != CELIX_SUCCESS) {
-                                L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic
%s/%s",
-                                       msgSer->msgName,
-                                       receiver->scope == NULL ? "(null)" : receiver->scope,
-                                       receiver->topic);
-                                break;
-                            }
-                            release = true;
+                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deSerializedMsg, message->metadata.metadata, &release);
+                    if (!release) {
+                        //receive function has taken ownership, deserialize again for new
message
+                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
1, &deSerializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic
%s/%s",
+                                   msgSer->msgName,
+                                   receiver->scope == NULL ? "(null)" : receiver->scope,
+                                   receiver->topic);
+                            break;
                         }
+                        release = true;
                     }
-                    pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgType, msgId, deSerializedMsg, metadata);
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
-                    }
-                    if (message->metadata.metadata) {
-                        celix_properties_destroy(message->metadata.metadata);
-                    }
-                    updateReceiveCount += 1;
                 }
+                if (release) {
+                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                }
+                if (message->metadata.metadata) {
+                    celix_properties_destroy(message->metadata.metadata);
+                }
+                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
msgSer->msgName,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
index 32a3328..7bea628 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c
@@ -34,7 +34,6 @@
 #include <uuid/uuid.h>
 #include "celix_constants.h"
 #include <pubsub_utils.h>
-#include "pubsub_interceptors_handler.h"
 
 #define TCP_BIND_MAX_RETRY                      10
 
@@ -58,7 +57,6 @@ struct pubsub_tcp_topic_sender {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
-    pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;
     char *topic;
@@ -145,7 +143,6 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic,
PUBSUB_TCP_ADMIN_TYPE, "*unknown*");
     sender->isPassive = false;
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
@@ -303,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender)
{
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
-        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL))
{
             pubsub_tcpHandler_destroy(sender->socketHandler);
             sender->socketHandler = NULL;
@@ -531,58 +527,47 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const
void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        bool cont = false;
-        if (status == CELIX_SUCCESS) /*ser ok*/ {
-            cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler,
entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+        pubsub_protocol_message_t message;
+        message.metadata.metadata = NULL;
+        message.payload.payload = NULL;
+        message.payload.length = 0;
+        if (serializedIoVecOutput) {
+            message.payload.payload = serializedIoVecOutput->iov_base;
+            message.payload.length = serializedIoVecOutput->iov_len;
         }
-        if (cont) {
-            pubsub_protocol_message_t message;
-            message.metadata.metadata = NULL;
-            message.payload.payload = NULL;
-            message.payload.length = 0;
-            if (serializedIoVecOutput) {
-                message.payload.payload = serializedIoVecOutput->iov_base;
-                message.payload.length = serializedIoVecOutput->iov_len;
+        message.header.msgId = msgTypeId;
+        message.header.seqNr = entry->seqNr;
+        message.header.msgMajorVersion = entry->major;
+        message.header.msgMinorVersion = entry->minor;
+        message.header.payloadSize = 0;
+        message.header.payloadPartSize = 0;
+        message.header.payloadOffset = 0;
+        message.header.metadataSize = 0;
+        if (metadata != NULL)
+            message.metadata.metadata = metadata;
+        entry->seqNr++;
+        bool sendOk = true;
+        {
+            int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
serializedIoVecOutputLen, 0);
+            if (rc < 0) {
+                status = -1;
+                sendOk = false;
             }
-            message.header.msgId = msgTypeId;
-            message.header.seqNr = entry->seqNr;
-            message.header.msgMajorVersion = entry->major;
-            message.header.msgMinorVersion = entry->minor;
-            message.header.payloadSize = 0;
-            message.header.payloadPartSize = 0;
-            message.header.payloadOffset = 0;
-            message.header.metadataSize = 0;
-            if (metadata != NULL)
-                message.metadata.metadata = metadata;
-            entry->seqNr++;
-            bool sendOk = true;
-            {
-                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message,
serializedIoVecOutput, serializedIoVecOutputLen, 0);
-                if (rc < 0) {
-                    status = -1;
-                    sendOk = false;
-                }
-                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName,
msgTypeId, inMsg, metadata);
-                if (message.metadata.metadata)
-                    celix_properties_destroy(message.metadata.metadata);
-                if (serializedIoVecOutput) {
-                    entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
-                                                    serializedIoVecOutput,
-                                                    serializedIoVecOutputLen);
-                    serializedIoVecOutput = NULL;
-                }
+            if (message.metadata.metadata)
+                celix_properties_destroy(message.metadata.metadata);
+            if (serializedIoVecOutput) {
+                entry->msgSer->freeSerializeMsg(entry->msgSer->handle,
+                                                serializedIoVecOutput,
+                                                serializedIoVecOutputLen);
+                serializedIoVecOutput = NULL;
             }
+        }
 
-            if (sendOk) {
-                sendCountUpdate = 1;
-            } else {
-                sendErrorUpdate = 1;
-                L_WARN("[PSA_TCP_TS] Error sending msg. %s", strerror(errno));
-            }
+        if (sendOk) {
+            sendCountUpdate = 1;
         } else {
-            serializationErrorUpdate = 1;
-            L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic %s/%s",
entry->msgSer->msgName,
-                   sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+            sendErrorUpdate = 1;
+            L_WARN("[PSA_TCP_TS] Error sending msg. %s", strerror(errno));
         }
     } else {
         //unknownMessageCountUpdate = 1;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
index 62b2fbf..28146af 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c
@@ -39,8 +39,6 @@
 #include <pubsub_utils.h>
 #include <celix_api.h>
 
-#include "pubsub_interceptors_handler.h"
-
 #include "celix_utils_api.h"
 
 #define PSA_ZMQ_RECV_TIMEOUT 1000
@@ -70,8 +68,6 @@ struct pubsub_zmq_topic_receiver {
     char *topic;
     bool metricsEnabled;
 
-    pubsub_interceptors_handler_t *interceptorsHandler;
-
     void *zmqCtx;
     void *zmqSock;
 
@@ -157,8 +153,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED,
PSA_ZMQ_DEFAULT_METRICS_ENABLED);
 
-    receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic,
PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
-
 #ifdef BUILD_WITH_ZMQ_SECURITY
     char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
     if (keys_bundle_dir == NULL) {
@@ -334,8 +328,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver)
{
         zmq_close(receiver->zmqSock);
         zmq_ctx_term(receiver->zmqCtx);
 
-        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
-
         free(receiver->scope);
         free(receiver->topic);
     }
@@ -522,33 +514,26 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t
*rec
                 clock_gettime(CLOCK_REALTIME, &endSer);
             }
             if (status == CELIX_SUCCESS) {
-
-                const char *msgType = msgSer->msgName;
-                uint32_t msgId = message->header.msgId;
                 celix_properties_t *metadata = message->metadata.metadata;
-                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler,
msgType, msgId, deserializedMsg, &metadata);
                 bool release = true;
-                if (cont) {
-                    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
-                    while (hashMapIterator_hasNext(&iter2)) {
-                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
-                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deserializedMsg, metadata, &release);
-                        if (!release) {
-                            //receive function has taken ownership deserialize again for
new message
-                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
0, &deserializedMsg);
-                            if (status != CELIX_SUCCESS) {
-                                L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                                break;
-                            }
-                            release = true;
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deserializedMsg, metadata, &release);
+                    if (!release) {
+                        //receive function has taken ownership deserialize again for new
message
+                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer,
0, &deserializedMsg);
+                        if (status != CELIX_SUCCESS) {
+                            L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic
%s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+                            break;
                         }
-                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
msgType, msgId, deserializedMsg, metadata);
+                        release = true;
                     }
-                    if (release) {
-                        msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
-                    }
-                    updateReceiveCount += 1;
                 }
+                if (release) {
+                    msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg);
+                }
+                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
index aba8893..19c4660 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c
@@ -32,7 +32,6 @@
 #include "pubsub_psa_zmq_constants.h"
 #include <uuid/uuid.h>
 #include "celix_constants.h"
-#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS             2
 #define ZMQ_BIND_MAX_RETRY                      10
@@ -57,8 +56,6 @@ struct pubsub_zmq_topic_sender {
     bool metricsEnabled;
     bool zeroCopyEnabled;
 
-    pubsub_interceptors_handler_t *interceptorsHandler;
-
     char *scope;
     char *topic;
     char *url;
@@ -157,8 +154,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED,
PSA_ZMQ_DEFAULT_METRICS_ENABLED);
     sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED,
PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
 
-    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic,
PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*");
-
     //setting up zmq socket for ZMQ TopicSender
     {
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -347,8 +342,6 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender)
{
 
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
-        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
-
         if (sender->scope != NULL) {
             free(sender->scope);
         }
@@ -573,138 +566,133 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int
msgTypeId, co
                 usleep(500);
             }
 
-            bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler,
entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
-            if (cont) {
+            pubsub_protocol_message_t message;
+            message.payload.payload = serializedOutput->iov_base;
+            message.payload.length = serializedOutput->iov_len;
 
-                pubsub_protocol_message_t message;
-                message.payload.payload = serializedOutput->iov_base;
-                message.payload.length = serializedOutput->iov_len;
+            void *payloadData = NULL;
+            size_t payloadLength = 0;
+            entry->protSer->encodePayload(entry->protSer->handle, &message,
&payloadData, &payloadLength);
 
-                void *payloadData = NULL;
-                size_t payloadLength = 0;
-                entry->protSer->encodePayload(entry->protSer->handle, &message,
&payloadData, &payloadLength);
+            if (metadata != NULL) {
+                message.metadata.metadata = metadata;
+                entry->protSer->encodeMetadata(entry->protSer->handle, &message,
&entry->metadataBuffer, &entry->metadataBufferSize);
+            } else {
+                message.metadata.metadata = NULL;
+            }
 
-                if (metadata != NULL) {
-                    message.metadata.metadata = metadata;
-                    entry->protSer->encodeMetadata(entry->protSer->handle, &message,
&entry->metadataBuffer, &entry->metadataBufferSize);
-                } else {
-                    message.metadata.metadata = NULL;
+            entry->protSer->encodeFooter(entry->protSer->handle, &message,
&entry->footerBuffer, &entry->footerBufferSize);
+
+            message.header.msgId = msgTypeId;
+            message.header.seqNr = entry->seqNr;
+            message.header.msgMajorVersion = 0;
+            message.header.msgMinorVersion = 0;
+            message.header.payloadSize = payloadLength;
+            message.header.metadataSize = entry->metadataBufferSize;
+            message.header.payloadPartSize = payloadLength;
+            message.header.payloadOffset = 0;
+            message.header.isLastSegment = 1;
+            message.header.convertEndianess = 0;
+
+            // increase seqNr
+            entry->seqNr++;
+
+            entry->protSer->encodeHeader(entry->protSer->handle, &message,
&entry->headerBuffer, &entry->headerBufferSize);
+
+            errno = 0;
+            bool sendOk;
+
+            if (bound->parent->zeroCopyEnabled) {
+
+                zmq_msg_t msg1; // Header
+                zmq_msg_t msg2; // Payload
+                zmq_msg_t msg3; // Metadata
+                zmq_msg_t msg4; // Footer
+                void *socket = zsock_resolve(sender->zmq.socket);
+                psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
+                freeMsgEntry->msgSer = entry->msgSer;
+                freeMsgEntry->serializedOutput = serializedOutput;
+                freeMsgEntry->serializedOutputLen = serializedOutputLen;
+
+                zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize,
psa_zmq_unlockData, entry);
+                //send header
+                int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+                if (rc == -1) {
+                    L_WARN("Error sending header msg. %s", strerror(errno));
+                    zmq_msg_close(&msg1);
                 }
 
-                entry->protSer->encodeFooter(entry->protSer->handle, &message,
&entry->footerBuffer, &entry->footerBufferSize);
-
-                message.header.msgId = msgTypeId;
-                message.header.seqNr = entry->seqNr;
-                message.header.msgMajorVersion = 0;
-                message.header.msgMinorVersion = 0;
-                message.header.payloadSize = payloadLength;
-                message.header.metadataSize = entry->metadataBufferSize;
-                message.header.payloadPartSize = payloadLength;
-                message.header.payloadOffset = 0;
-                message.header.isLastSegment = 1;
-                message.header.convertEndianess = 0;
-
-                // increase seqNr
-                entry->seqNr++;
-
-                entry->protSer->encodeHeader(entry->protSer->handle, &message,
&entry->headerBuffer, &entry->headerBufferSize);
-
-                errno = 0;
-                bool sendOk;
-
-                if (bound->parent->zeroCopyEnabled) {
-
-                    zmq_msg_t msg1; // Header
-                    zmq_msg_t msg2; // Payload
-                    zmq_msg_t msg3; // Metadata
-                    zmq_msg_t msg4; // Footer
-                    void *socket = zsock_resolve(sender->zmq.socket);
-                    psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
-                    freeMsgEntry->msgSer = entry->msgSer;
-                    freeMsgEntry->serializedOutput = serializedOutput;
-                    freeMsgEntry->serializedOutputLen = serializedOutputLen;
-
-                    zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize,
psa_zmq_unlockData, entry);
-                    //send header
-                    int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+                //send Payload
+                if (rc > 0) {
+                    int flag = ((entry->metadataBufferSize > 0)  || (entry->footerBufferSize
> 0)) ? ZMQ_SNDMORE : 0;
+                    zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg,
freeMsgEntry);
+                    rc = zmq_msg_send(&msg2, socket, flag);
                     if (rc == -1) {
-                        L_WARN("Error sending header msg. %s", strerror(errno));
-                        zmq_msg_close(&msg1);
-                    }
-
-                    //send Payload
-                    if (rc > 0) {
-                        int flag = ((entry->metadataBufferSize > 0)  || (entry->footerBufferSize
> 0)) ? ZMQ_SNDMORE : 0;
-                        zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg,
freeMsgEntry);
-                        rc = zmq_msg_send(&msg2, socket, flag);
-                        if (rc == -1) {
-                            L_WARN("Error sending payload msg. %s", strerror(errno));
-                            zmq_msg_close(&msg2);
-                        }
-                    }
-
-                    //send MetaData
-                    if (rc > 0 && entry->metadataBufferSize > 0) {
-                        int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
-                        zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize,
NULL, NULL);
-                        rc = zmq_msg_send(&msg3, socket, flag);
-                        if (rc == -1) {
-                            L_WARN("Error sending metadata msg. %s", strerror(errno));
-                            zmq_msg_close(&msg3);
-                        }
-                    }
-
-                    //send Footer
-                    if (rc > 0 && entry->footerBufferSize > 0) {
-                        zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize,
NULL, NULL);
-                        rc = zmq_msg_send(&msg4, socket, 0);
-                        if (rc == -1) {
-                            L_WARN("Error sending footer msg. %s", strerror(errno));
-                            zmq_msg_close(&msg4);
-                        }
-                    }
-
-                    sendOk = rc > 0;
-                } else {
-                    //no zero copy
-                    zmsg_t *msg = zmsg_new();
-                    zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
-                    zmsg_addmem(msg, payloadData, payloadLength);
-                    if (entry->metadataBufferSize > 0) {
-                        zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
-                    }
-                    if (entry->footerBufferSize > 0) {
-                        zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
+                        L_WARN("Error sending payload msg. %s", strerror(errno));
+                        zmq_msg_close(&msg2);
                     }
-                    int rc = zmsg_send(&msg, sender->zmq.socket);
-                    sendOk = rc == 0;
+                }
 
-                    if (!sendOk) {
-                        zmsg_destroy(&msg); //if send was not ok, no owner change ->
destroy msg
+                //send MetaData
+                if (rc > 0 && entry->metadataBufferSize > 0) {
+                    int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
+                    zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize,
NULL, NULL);
+                    rc = zmq_msg_send(&msg3, socket, flag);
+                    if (rc == -1) {
+                        L_WARN("Error sending metadata msg. %s", strerror(errno));
+                        zmq_msg_close(&msg3);
                     }
+                }
 
-                    // Note: serialized Payload is deleted by serializer
-                    if (payloadData && (payloadData != message.payload.payload))
{
-                        free(payloadData);
+                //send Footer
+                if (rc > 0 && entry->footerBufferSize > 0) {
+                    zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize,
NULL, NULL);
+                    rc = zmq_msg_send(&msg4, socket, 0);
+                    if (rc == -1) {
+                        L_WARN("Error sending footer msg. %s", strerror(errno));
+                        zmq_msg_close(&msg4);
                     }
-
-                    __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
                 }
-                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName,
msgTypeId, inMsg, metadata);
 
-                if (message.metadata.metadata) {
-                    celix_properties_destroy(message.metadata.metadata);
+                sendOk = rc > 0;
+            } else {
+                //no zero copy
+                zmsg_t *msg = zmsg_new();
+                zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
+                zmsg_addmem(msg, payloadData, payloadLength);
+                if (entry->metadataBufferSize > 0) {
+                    zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
                 }
-                if (!bound->parent->zeroCopyEnabled && serializedOutput) {
-                    entry->msgSer->freeSerializeMsg(entry->msgSer->handle, serializedOutput,
serializedOutputLen);
+                if (entry->footerBufferSize > 0) {
+                    zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
                 }
+                int rc = zmsg_send(&msg, sender->zmq.socket);
+                sendOk = rc == 0;
 
-                if (sendOk) {
-                    sendCountUpdate = 1;
-                } else {
-                    sendErrorUpdate = 1;
-                    L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
+                if (!sendOk) {
+                    zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy
msg
+                }
+
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message.payload.payload)) {
+                    free(payloadData);
                 }
+
+                __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
+            }
+
+            if (message.metadata.metadata) {
+                celix_properties_destroy(message.metadata.metadata);
+            }
+            if (!bound->parent->zeroCopyEnabled && serializedOutput) {
+                entry->msgSer->freeSerializeMsg(entry->msgSer->handle, serializedOutput,
serializedOutputLen);
+            }
+
+            if (sendOk) {
+                sendCountUpdate = 1;
+            } else {
+                sendErrorUpdate = 1;
+                L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
             }
         } else {
             serializationErrorUpdate = 1;

Mime
View raw message