celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 01/11: Add Message segmentation
Date Mon, 12 Oct 2020 19:18:56 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 63fe56e1231146e446b8801e9a72f46f5bfc67e3
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Tue Jun 23 21:53:13 2020 +0200

    Add Message segmentation
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 241 +++++++++++++--------
 1 file changed, 149 insertions(+), 92 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 3bb31cd..9ef6361 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -78,6 +78,7 @@ typedef struct psa_tcp_connection_entry {
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
+    unsigned int maxMsgSize;
     unsigned int syncSize;
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer ->
included in payload
@@ -117,6 +118,7 @@ struct pubsub_tcpHandler {
     pubsub_protocol_service_t *protocol;
     unsigned int bufferSize;
     unsigned int maxNofBuffer;
+    unsigned int maxMsgSize;
     unsigned int maxSendRetryCount;
     unsigned int maxRcvRetryCount;
     double sendTimeout;
@@ -331,6 +333,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char
*url, ch
         entry->headerBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
+        entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX;
         handle->protocol->getFooterSize(handle->protocol->handle, &size);
         entry->footerSize = size;
         entry->bufferSize = handle->bufferSize;
@@ -658,6 +661,20 @@ int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
 }
 
 //
+// Setup receive buffer size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxMsgSize = size;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+    return 0;
+}
+
+
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int maxMsgSize);
+//
 // Setup thread timeout
 //
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle,
@@ -878,7 +895,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
                     unsigned int size = entry->header.header.payloadPartSize;
                     // For header less messages adjust offset and msg size;
                     if (!entry->headerBufferSize) {
-                        offset = entry->headerSize;
+                        offset += entry->headerSize;
                         size -= offset;
                     }
                     // Read payload data from queue
@@ -923,7 +940,9 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     if (nbytes > 0) {
         entry->retryCount = 0;
         // Check if complete message is received
-        if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
validMsg) {
+        if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
+             validMsg &&
+             entry->header.header.isLastSegment) {
             entry->bufferReadSize = 0;
             pubsub_tcpHandler_decodePayload(handle, entry);
         }
@@ -1026,6 +1045,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
     int nofConnToClose = 0;
     if (handle) {
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
+        size_t max_msg_iov_len = IOV_MAX - 2;
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->connected) continue;
@@ -1052,6 +1072,16 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                                                  &metadataSize);
             }
             message->header.metadataSize = metadataSize;
+            size_t totalMessageSize = payloadSize + metadataSize;
+
+            bool isMessageSegmentationSupported = false;
+            handle->protocol->isMessageSegmentationSupported(handle->protocol->handle,
&isMessageSegmentationSupported);
+            if (((!isMessageSegmentationSupported) && (msg_iov_len > max_msg_iov_len))
||
+                ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize)))
{
+                L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation
is not supported\n",
+                       entry->fd);
+                continue;
+            }
 
             void *footerData = NULL;
             size_t footerDataSize = 0;
@@ -1061,107 +1091,134 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                                                  &footerDataSize);
             }
 
+
             size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
-
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+            size_t msgIovLen = 0;
+            long int nbytes = UINT32_MAX;
+            while (msgSize < totalMessageSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                // Write generic seralized payload in vector buffer
+                if (payloadSize && payloadData) {
+                    char *payloadDataBuffer = payloadData;
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize];
+                    msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize);
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset = msgSize;
                     msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                } else {
+                    // copy serialized vector into vector buffer
+                    for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) {
+                        msg.msg_iovlen++;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base;
+                        msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
+                        if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize)
+                            break;
+                        msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    }
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset = msgSize;
+                    msgSize += msgPartSize;
+                    msgIovLen += (msg.msg_iovlen - 1);
                 }
-            }
 
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                // Write optional metadata in vector buffer
+                if ((msgSize < (payloadSize + metadataSize)) &&
+                    (msgPartSize < entry->maxMsgSize) &&
+                    (metadataSize && metadataData)) {
+                    msg.msg_iovlen++;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    message->header.metadataSize = metadataSize;
+                    msgSize += metadataSize;
+                }
+                if (msgSize >= totalMessageSize) {
+                    message->header.isLastSegment = 0x1;
+                }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                // Write optional footerData in vector buffer
+                if (footerData && footerDataSize) {
+                    msg.msg_iovlen++;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again.
Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // check if header is not part of the payload (=> headerBufferSize = 0)s
+                if (entry->headerBufferSize) {
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle,
message,
+                                                   &headerData,
+                                                   &headerSize);
+                }
+                if (!entry->headerBufferSize) {
+                    // Skip header buffer, when header is part of payload;
+                    msg.msg_iov = &msg_iov[1];
+                } else if (headerSize && headerData) {
+                    // Write header in 1st vector buffer item
+                    msg.msg_iov[0].iov_base = headerData;
+                    msg.msg_iov[0].iov_len = headerSize;
+                    msgPartSize += msg.msg_iov[0].iov_len;
+                    msg.msg_iovlen++;
                 } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing
connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    L_ERROR("[TCP Socket] No header buffer is generated");
+                    msg.msg_iovlen = 0;
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), error: %s. try
again. Retry count %u of %u, ",
+                            entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes, strerror(errno));
+                    }
+                }
+                // Release data
+                if (headerData) {
+                    free(headerData);
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload))
{
+                    free(payloadData);
+                }
+                if (metadataData) {
+                    free(metadataData);
+                }
+                if (footerData) {
+                    free(footerData);
                 }
             }
-            // Release data
-            if (headerData) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData) {
-                free(metadataData);
-            }
-            if (footerData) {
-                free(footerData);
-            }
+
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);


Mime
View raw message