celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 09/11: Fix buffers
Date Mon, 12 Oct 2020 19:19:04 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 75794733c84213d2a1c9eb63182bdb79a848db47
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Mon Aug 3 13:50:35 2020 +0200

    Fix buffers
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 458 ++++++++++++++++-----
 1 file changed, 352 insertions(+), 106 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 4734497..aae0406 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -84,7 +84,9 @@ typedef struct psa_tcp_connection_entry {
     socklen_t len;
     bool connected;
     bool headerError;
+    unsigned int state;
     pubsub_protocol_message_t header;
+    unsigned int maxMsgSize;
     unsigned int syncSize;
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer ->
included in payload
@@ -94,10 +96,11 @@ typedef struct psa_tcp_connection_entry {
     unsigned int bufferSize;
     void *buffer;
     unsigned int bufferReadSize;
+    unsigned int bufferReadReadOffset;
+    unsigned int expectedBufferReadSize;
+    unsigned int msgSizeReadSize;
     unsigned int metaBufferSize;
     void *metaBuffer;
-    struct msghdr msg;
-    size_t msg_iovlen;        /* Number of elements in the vector.  */
     unsigned int retryCount;
 } psa_tcp_connection_entry_t;
 
@@ -124,6 +127,7 @@ struct pubsub_tcpHandler {
     pubsub_protocol_service_t *protocol;
     unsigned int bufferSize;
     unsigned int maxNofBuffer;
+    unsigned int maxMsgSize;
     unsigned int maxSendRetryCount;
     unsigned int maxRcvRetryCount;
     double sendTimeout;
@@ -132,29 +136,19 @@ struct pubsub_tcpHandler {
     bool running;
 };
 
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, bool lock);
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry);
-
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
-                              struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t
*handle, int fd, char *url, char *external_url, struct sockaddr_in *addr);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int
fd, unsigned int index);
-
 static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
+static inline int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* buffer, int flag );
+static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry);
+static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle,
psa_tcp_connection_entry_t *entry, int nextState);
 static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry);
-
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
 static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
 static void *pubsub_tcpHandler_thread(void *data);
 
 //
@@ -348,9 +342,6 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char
*url, ch
         }
         if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
         if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
-        entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
-        entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
-        entry->msg_iovlen++;
     }
     return entry;
 }
@@ -458,6 +449,13 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url)
{
                 L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
                 entry = NULL;
             }
+            //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            //if (rc < 0) {
+            //    pubsub_tcpHandler_freeEntry(entry);
+            //    L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
+            //    entry = NULL;
+           // }
+
         }
         if ((rc >= 0) && (entry)) {
             celixThreadRwlock_writeLock(&handle->dbLock);
@@ -565,7 +563,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t
*handle,
     else {
         rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
         if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
         }
     }
     return rc;
@@ -774,7 +772,25 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle,
double tim
 }
 
 static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, int flag ) {
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
+    int expectedReadSize = size;
+    int nbytes = size;
+    int msgSize = 0;
+    char* buffer = (char*)_buffer;
+    while (nbytes > 0 && expectedReadSize > 0) {
+        // Read the message header
+        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+        // Update buffer administration
+        offset += nbytes;
+        expectedReadSize -= nbytes;
+        msgSize += nbytes;
+    }
+    if (nbytes <=0)  msgSize = nbytes;
+    return msgSize;
+}
+
+static inline
+int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, int flag ) {
     int nbytes = entry->expectedBufferReadSize;
     char* buffer = (char*)_buffer;
     while (nbytes > 0 && entry->expectedBufferReadSize > 0) {
@@ -811,6 +827,88 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
   }
 }
 
+  static inline
+  void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry) {
+      entry->bufferReadReadOffset = 0;
+      if (entry->state == READ_STATE_SYNC) {
+        entry->expectedBufferReadSize = entry->headerSize;
+        entry->state = READ_STATE_HEADER;
+      } else if (entry->state == READ_STATE_HEADER) {
+        if (entry->header.header.payloadSize) {
+            entry->state = READ_STATE_PAYLOAD;
+            entry->bufferReadReadOffset = entry->header.header.payloadOffset;
+            entry->expectedBufferReadSize = entry->header.header.payloadSize;
+            // For header less messages adjust offset and msg size;
+            if (!entry->headerBufferSize) {
+                entry->bufferReadReadOffset += entry->headerSize;
+                entry->expectedBufferReadSize -= entry->headerSize;
+            }
+        } else if (entry->header.header.metadataSize) {
+            entry->state = READ_STATE_META;
+            entry->expectedBufferReadSize = entry->header.header.metadataSize;
+        } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize)
{
+            if (entry->footerSize) {
+                entry->state = READ_STATE_FOOTER;
+                entry->expectedBufferReadSize = entry->footerSize;
+            } else if (entry->header.header.isLastSegment) {
+                entry->state = READ_STATE_READY;
+                entry->expectedBufferReadSize = 0;
+            } else {
+                entry->state = READ_STATE_HEADER;
+                entry->expectedBufferReadSize = entry->headerSize;
+            }
+        }
+      } else if (entry->state == READ_STATE_PAYLOAD) {
+        if (entry->header.header.metadataSize) {
+            entry->state = READ_STATE_META;
+            entry->expectedBufferReadSize = entry->header.header.metadataSize;
+        } else  {
+            if (entry->footerSize) {
+                entry->state = READ_STATE_FOOTER;
+                entry->expectedBufferReadSize = entry->footerSize;
+            } else if (entry->header.header.isLastSegment) {
+                entry->state = READ_STATE_READY;
+                entry->expectedBufferReadSize = 0;
+            } else {
+                entry->state = READ_STATE_HEADER;
+                entry->expectedBufferReadSize = entry->headerSize;
+            }
+        }
+      } else if (entry->state == READ_STATE_META) {
+        if (entry->footerSize) {
+            entry->state = READ_STATE_FOOTER;
+            entry->expectedBufferReadSize = entry->footerSize;
+        } else if (entry->header.header.isLastSegment) {
+            entry->state = READ_STATE_READY;
+            entry->expectedBufferReadSize = 0;
+        } else {
+            entry->state = READ_STATE_HEADER;
+            entry->expectedBufferReadSize = entry->headerSize;
+        }
+      } else if (entry->state == READ_STATE_FOOTER) {
+        if (entry->header.header.isLastSegment) {
+            entry->state = READ_STATE_READY;
+        } else {
+            entry->state = READ_STATE_HEADER;
+            entry->expectedBufferReadSize = entry->headerSize;
+        }
+      } else if (entry->state == READ_STATE_READY) {
+          entry->state = READ_STATE_HEADER;
+          entry->expectedBufferReadSize = entry->headerSize;
+      }
+}
+static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle,
psa_tcp_connection_entry_t *entry, int nextState){
+    entry->bufferReadReadOffset = 0;
+    if (nextState == READ_STATE_SYNC) {
+        entry->expectedBufferReadSize = entry->syncSize;
+        entry->state = nextState;
+    } else if (nextState == READ_STATE_HEADER) {
+        entry->expectedBufferReadSize = entry->headerSize;
+        entry->state = nextState;
+    }
+}
+
+
 
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
@@ -836,6 +934,144 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize))
{
         handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
         if (entry->buffer) free(entry->buffer);
+        entry->buffer = malloc((size_t) handle->bufferSize);
+        entry->bufferSize = handle->bufferSize;
+    }
+    // Read the message
+    bool validMsg = false;
+    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize,
MSG_PEEK);
+    if (nbytes > 0) {
+        // Check header message buffer
+        if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer,
entry->headerSize, &entry->header) != CELIX_SUCCESS) {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize,
0);
+            if (!entry->headerError) {
+                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)",
entry->fd, entry->url);
+            }
+            entry->headerError = true;
+            entry->bufferReadSize = 0;
+        } else {
+            // Read header message from queue
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize,
0);
+            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                entry->headerError = false;
+                // For headerless message, add header to bufferReadSize;
+                if (!entry->headerBufferSize)
+                    entry->bufferReadSize += nbytes;
+                // Alloc message buffers
+                if (entry->header.header.payloadSize > entry->bufferSize) {
+                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+                    if (entry->buffer)
+                        free(entry->buffer);
+                    entry->buffer = malloc((size_t) handle->bufferSize);
+                    entry->bufferSize = handle->bufferSize;
+                }
+                if (entry->header.header.metadataSize > entry->metaBufferSize) {
+                    if (entry->metaBuffer) {
+                        free(entry->metaBuffer);
+                    }
+                    entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
+                    entry->metaBufferSize = entry->header.header.metadataSize;
+                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer:
(%d, %d) \n", entry->fd,
+                           entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+                }
+
+                if (entry->header.header.payloadSize) {
+                    unsigned int offset = entry->header.header.payloadOffset;
+                    unsigned int size = entry->header.header.payloadPartSize;
+                    // For header less messages adjust offset and msg size;
+                    if (!entry->headerBufferSize) {
+                        offset = entry->headerSize;
+                        size -= offset;
+                    }
+                    // Read payload data from queue
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer,
offset, size, 0);
+                    if (nbytes > 0) {
+                        if (nbytes == size) {
+                            entry->bufferReadSize += nbytes;
+                        } else {
+                            entry->bufferReadSize = 0;
+                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer
(fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                        }
+                    }
+                }
+                if (nbytes > 0 && entry->header.header.metadataSize) {
+                    // Read meta data from queue
+                    unsigned int size = entry->header.header.metadataSize;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0,
size,0);
+                    if ((nbytes > 0) && (nbytes != size)) {
+                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd:
%d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                    }
+                }
+                // Check for end of message using, footer of message. Because of streaming
protocol
+                if (nbytes > 0) {
+                    if (entry->footerSize > 0) {
+                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0,
entry->footerSize,0);
+                        if (handle->protocol->decodeFooter(handle->protocol->handle,
entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
+                            // valid footer, this means that the message is valid
+                            validMsg = true;
+                        } else {
+                            // Did not receive correct header
+                            L_ERROR("[TCP Socket] Failed to decode message footer seq %d
(received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr,
entry->fd, entry->url);
+                            entry->bufferReadSize = 0;
+                        }
+                    } else {
+                        // No Footer, then complete message is received
+                        validMsg = true;
+                    }
+                }
+            }
+        }
+    }
+    if (nbytes > 0) {
+        entry->retryCount = 0;
+        // Check if complete message is received
+        if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
validMsg) {
+            entry->bufferReadSize = 0;
+            pubsub_tcpHandler_decodePayload(handle, entry);
+        }
+    } else {
+        if (entry->retryCount < handle->maxRcvRetryCount) {
+            entry->retryCount++;
+            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count
%u of %u,", entry->fd,
+                   strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+        } else {
+            L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing
connection... Error: %s",
+                    entry->fd, handle->maxRcvRetryCount, strerror(errno));
+            nbytes = 0; //Return 0 as indicator to close the connection
+        }
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+    return nbytes;
+}
+
+
+//
+// Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
+// If the message is completely reassembled true is returned and the index and size have
valid values
+//
+int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *)
(intptr_t) fd);
+    if (entry == NULL)
+        entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+    // Find FD entry
+    if (entry == NULL) {
+        celixThreadRwlock_unlock(&handle->dbLock);
+        return -1;
+    }
+    // If it's not connected return from function
+    if (!entry->connected) {
+        celixThreadRwlock_unlock(&handle->dbLock);
+        return -1;
+    }
+
+    // Message buffer is to small, reallocate to make it bigger
+    if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize))
{
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
+        if (entry->buffer) free(entry->buffer);
             entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
         }
@@ -843,13 +1079,13 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     long int nbytes = 0;
     char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
     if (entry->state == READ_STATE_SYNC) {
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0);
         if (nbytes > 0) {
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
     }
     if (entry->state == READ_STATE_HEADER) {
-      nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
+      nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, MSG_PEEK);
       if (nbytes >= entry->headerSize) { // Check header message buffer
           if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer,
entry->headerSize, &entry->header) != CELIX_SUCCESS) {
               // Did not receive correct header
@@ -862,7 +1098,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
           } else {
               // Read header message from queue
               pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
-              nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+              nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0);
               if ((nbytes > 0) && (nbytes == entry->headerSize)) {
                  entry->headerError = false;
                  entry->msgSizeReadSize = 0;
@@ -885,7 +1121,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
         }
 
         // Read payload data from queue
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
+        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->buffer, 0);
         if (nbytes > 0) {
             if (nbytes >= entry->header.header.payloadPartSize) {
                 entry->msgSizeReadSize += nbytes;
@@ -906,7 +1142,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
         }
 
         // Read meta data from (queue
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0);
+        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->metaBuffer,0);
         if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize))
{
             entry->msgSizeReadSize += nbytes;
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
@@ -914,7 +1150,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     }
     if (entry->state == READ_STATE_FOOTER) {
         // Check for end of message using, footer of message. Because of streaming protocol
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,
0);
+        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->footerBuffer,
0);
         if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer,
entry->footerSize, &entry->header) == CELIX_SUCCESS) {
             // valid footer, this means that the message is valid
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
@@ -1034,6 +1270,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
     int nofConnToClose = 0;
     if (handle) {
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
+        size_t max_msg_iov_len = IOV_MAX - 2;
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->connected) continue;
@@ -1055,9 +1292,11 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
+                metadataData = entry->metaBuffer;
                 handle->protocol->encodeMetadata(handle->protocol->handle, message,
                                                  &metadataData,
                                                  &metadataSize);
+                entry->metaBufferSize = metadataSize;
             }
             message->header.metadataSize = metadataSize;
             size_t totalMessageSize = payloadSize + metadataSize;
@@ -1074,12 +1313,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *footerData = NULL;
             size_t footerDataSize = 0;
             if (entry->footerSize) {
+                footerData = entry->footerBuffer;
                 handle->protocol->encodeFooter(handle->protocol->handle, message,
                                                  &footerData,
                                                  &footerDataSize);
+                entry->footerSize = footerDataSize;
             }
 
-
             size_t msgSize = 0;
             size_t msgIovLen = 0;
             long int nbytes = UINT32_MAX;
@@ -1098,33 +1338,35 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 message->header.isLastSegment = 0;
 
                 // Write generic seralized payload in vector buffer
-                if (payloadSize && payloadData) {
-                    char *payloadDataBuffer = payloadData;
-                    msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize];
-                    msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize);
-                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-                    message->header.payloadPartSize = msgPartSize;
-                    message->header.payloadOffset = msgSize;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-                } else {
-                    // copy serialized vector into vector buffer
-                    for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) {
+                if (msgSize < payloadSize) {
+                    if (payloadSize && payloadData) {
+                        char *payloadDataBuffer = payloadData;
                         msg.msg_iovlen++;
-                        msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base;
-                        msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
-                        if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize)
-                            break;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize),
entry->maxMsgSize);
                         msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        message->header.payloadPartSize = msgPartSize;
+                        message->header.payloadOffset = msgSize;
+                        msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    } else {
+                        // copy serialized vector into vector buffer
+                        for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++)
{
+                            msg.msg_iovlen++;
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
+                            if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) >
entry->maxMsgSize)
+                                break;
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        }
+                        message->header.payloadPartSize = msgPartSize;
+                        message->header.payloadOffset = msgSize;
+                        msgSize += msgPartSize;
+                        msgIovLen += (msg.msg_iovlen - 1);
                     }
-                    message->header.payloadPartSize = msgPartSize;
-                    message->header.payloadOffset = msgSize;
-                    msgSize += msgPartSize;
-                    msgIovLen += (msg.msg_iovlen - 1);
                 }
 
                 // Write optional metadata in vector buffer
-                if ((msgSize < (payloadSize + metadataSize)) &&
+                if ((msgSize >= payloadSize) &&
                     (msgPartSize < entry->maxMsgSize) &&
                     (metadataSize && metadataData)) {
                     msg.msg_iovlen++;
@@ -1146,65 +1388,67 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                 }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again.
Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // check if header is not part of the payload (=> headerBufferSize = 0)s
+                if (entry->headerBufferSize) {
+                    headerData = entry->headerBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle,
message,
+                                                   &headerData,
+                                                   &headerSize);
+                    entry->headerBufferSize = headerSize;
+                }
+                if (!entry->headerBufferSize) {
+                    // Skip header buffer, when header is part of payload;
+                    msg.msg_iov = &msg_iov[1];
+                } else if (headerSize && headerData) {
+                    // Write header in 1st vector buffer item
+                    msg.msg_iov[0].iov_base = headerData;
+                    msg.msg_iov[0].iov_len = headerSize;
+                    msgPartSize += msg.msg_iov[0].iov_len;
+                    msg.msg_iovlen++;
                 } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing
connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    L_ERROR("[TCP Socket] No header buffer is generated");
+                    msg.msg_iovlen = 0;
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize,
flags);
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), error: %s. try
again. Retry count %u of %u, ",
+                            entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes, strerror(errno));
+                    }
+                }
+                // Release data
+                if (headerData && headerData != entry->headerBuffer) {
+                    free(headerData);
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload))
{
+                    free(payloadData);
+                }
+                if (metadataData && metadataData != entry->metaBuffer) {
+                    free(metadataData);
+                }
+                if (footerData && footerData != entry->footerBuffer) {
+                    free(footerData);
                 }
-            }
-            // Release data
-            if (headerData) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData) {
-                free(metadataData);
-            }
-            if (footerData) {
-                free(footerData);
             }
         }
     }
@@ -1216,6 +1460,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
     return result;
 }
 
+
+
 //
 // get interface URL
 //


Mime
View raw message