celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 11/11: Add
Date Mon, 12 Oct 2020 19:18:42 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
in repository https://gitbox.apache.org/repos/asf/celix.git

commit c6c7131cbf6a4629ff5d8fb86a750a9659de28ed
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Wed Aug 12 21:59:22 2020 +0200

    Add
---
 CMakeLists.txt                                     |   2 +-
 .../publisher/private/src/pubsub_publisher.c       |  24 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 469 +++++++++++++++------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   1 +
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |   1 +
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |   2 +
 .../src/pubsub_wire_protocol_common.c              |   6 +-
 .../src/pubsub_wire_v2_protocol_impl.c             |   6 +-
 .../pubsub/pubsub_spi/include/pubsub_protocol.h    |   4 +-
 9 files changed, 364 insertions(+), 151 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9ede833..788200f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -47,7 +47,7 @@ set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
 
 # Set C++ specific flags
 set(CMAKE_CXX_FLAGS "-std=c++11 -fno-rtti ${CMAKE_CXX_FLAGS}")
-set(CMAKE_CXX_FLAGS "-Wall -Werror -Wextra -Weffc++ ${CMAKE_CXX_FLAGS}")
+#set(CMAKE_CXX_FLAGS "-Wall -Werror -Wextra -Weffc++ ${CMAKE_CXX_FLAGS}")
 
 if(APPLE)
     set(CMAKE_MACOSX_RPATH 1)
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 03857dd..92e0e65 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -54,15 +54,16 @@ static void* send_thread(void* arg) {
     //poi_t point = calloc(1,sizeof(*point));
     location_t place = calloc(1, sizeof(*place));
 
-    char *desc = calloc(64, sizeof(char));
-    snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+    char *desc = calloc(1, sizeof(char)); //calloc(64, sizeof(char));
+    //snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
 
-    char *name = calloc(64, sizeof(char));
-    snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
+    char *name = calloc(1, sizeof(char));//calloc(64, sizeof(char));
+    //snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
 
     place->name = name;
     place->description = desc;
-    place->extra = "extra value";
+    //place->extra = "extra value";
+    place->extra = calloc(1, 1);
     printf("TOPIC : %s\n", st_struct->topic);
 
     unsigned int msgId = 0;
@@ -77,18 +78,21 @@ static void* send_thread(void* arg) {
         if (msgId > 0) {
             place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
             place->position.lon = randCoordinate(MIN_LON, MAX_LON);
-            int nr_char = (int) randCoordinate(5, 100000);
+            //int nr_char = (int) randCoordinate(5, 100000);
+            int nr_char = 1;//(int) randCoordinate(5, 20);
             place->data = calloc(nr_char, 1);
             for (int i = 0; i < (nr_char - 1); i++) {
                 place->data[i] = i % 10 + '0';
             }
             place->data[nr_char - 1] = '\0';
             if (publish_svc->send) {
-                celix_properties_t *metadata = celix_properties_create();
-                celix_properties_set(metadata, "Key", "Value");
+                celix_properties_t *metadata = NULL;
+                //celix_properties_t *metadata = celix_properties_create();
+                //celix_properties_set(metadata, "Key", "Value");
+
                 if (publish_svc->send(publish_svc->handle, msgId, place, metadata) == 0) {
-                    printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
-                           place->position.lat, place->position.lon, place->name, place->description, nr_char);
+                   // printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
+                   //        place->position.lat, place->position.lon, place->name, place->description, nr_char);
                 }
             } else {
                 printf("No send for %s\n", st_struct->topic);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 411e9d2..2749794 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -48,6 +48,7 @@
 
 #define MAX_EVENTS   64
 #define MAX_DEFAULT_BUFFER_SIZE 4u
+#define PADDING_BUFFER_SIZE 4u
 
 #define READ_STATE_INIT         0u
 #define READ_STATE_HEADER       1u
@@ -86,17 +87,24 @@ typedef struct psa_tcp_connection_entry {
     unsigned int maxMsgSize;
     unsigned int syncSize;
     unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
+    unsigned int readHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
+    void *readHeaderBuffer;
+    unsigned int writeHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
+    void *writeHeaderBuffer;
+    unsigned int readFooterSize;
+    void *readFooterBuffer;
+    unsigned int writeFooterSize;
+    void *writeFooterBuffer;
     unsigned int bufferSize;
     void *buffer;
     unsigned int bufferReadReadOffset;
     unsigned int expectedBufferReadSize;
     unsigned int msgSizeReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
+    void *writePaddingBuffer;
     unsigned int retryCount;
 } psa_tcp_connection_entry_t;
 
@@ -130,6 +138,7 @@ struct pubsub_tcpHandler {
     double rcvTimeout;
     celix_thread_t thread;
     bool running;
+    bool isEndPoint;
 };
 
 static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
@@ -324,19 +333,22 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
         handle->protocol->getHeaderSize(handle->protocol->handle, &size);
         entry->headerSize = size;
         handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size);
-        entry->headerBufferSize = size;
+        entry->readHeaderBufferSize = size;
+        entry->writeHeaderBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
         entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX;
         handle->protocol->getFooterSize(handle->protocol->handle, &size);
-        entry->footerSize = size;
+        entry->readFooterSize = size;
+        entry->writeFooterSize = size;
         entry->bufferSize = handle->bufferSize;
         entry->connected = false;
-        if (entry->headerBufferSize) {
-            entry->headerBuffer = calloc(sizeof(char), entry->headerSize);
-        }
-        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+        if (entry->readHeaderBufferSize) entry->readHeaderBuffer = calloc(sizeof(char), entry->readHeaderBufferSize);
+        if (entry->writeHeaderBufferSize) entry->writeHeaderBuffer = calloc(sizeof(char), entry->writeHeaderBufferSize);
+        if (entry->readFooterSize) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterSize);
+        if (entry->writeFooterSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterSize);
         if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
+        entry->writePaddingBuffer = calloc(sizeof(char),PADDING_BUFFER_SIZE);
         pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
     }
     return entry;
@@ -348,40 +360,17 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch
 static inline void
 pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
     if (entry) {
-        if (entry->url) {
-            free(entry->url);
-            entry->url = NULL;
-        }
-        if (entry->interface_url) {
-            free(entry->interface_url);
-            entry->interface_url = NULL;
-        }
-        if (entry->fd >= 0) {
-            close(entry->fd);
-            entry->fd = -1;
-        }
-        if (entry->buffer) {
-            free(entry->buffer);
-            entry->buffer = NULL;
-            entry->bufferSize = 0;
-        }
-        if (entry->headerBuffer) {
-            free(entry->headerBuffer);
-            entry->headerBuffer = NULL;
-            entry->headerBufferSize = 0;
-        }
-
-        if (entry->footerBuffer) {
-            free(entry->footerBuffer);
-            entry->footerBuffer = NULL;
-        }
-
-        if (entry->metaBuffer) {
-            free(entry->metaBuffer);
-            entry->metaBuffer = NULL;
-            entry->metaBufferSize = 0;
-        }
-        entry->connected = false;
+        if (entry->url) free(entry->url);
+        if (entry->interface_url) free(entry->interface_url);
+        if (entry->fd >= 0) close(entry->fd);
+        if (entry->buffer) free(entry->buffer);
+        if (entry->readHeaderBuffer) free(entry->readHeaderBuffer);
+        if (entry->writeHeaderBuffer) free(entry->writeHeaderBuffer);
+        if (entry->readFooterBuffer) free(entry->readFooterBuffer);
+        if (entry->writeFooterBuffer) free(entry->writeFooterBuffer);
+        if (entry->readMetaBuffer) free(entry->readMetaBuffer);
+        if (entry->writeMetaBuffer) free(entry->writeMetaBuffer);
+        if (entry->writePaddingBuffer) free(entry->writePaddingBuffer);
         free(entry);
     }
 }
@@ -403,8 +392,7 @@ pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsign
 //
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
     int rc = 0;
-    psa_tcp_connection_entry_t *entry =
-        hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
     if (entry == NULL) {
         pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
         int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
@@ -418,7 +406,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
         if ((rc >= 0) && addr) {
             rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc < 0 && errno != EINPROGRESS) {
-                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->port_nr, interface_url,
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno,
                         strerror(errno));
                 close(fd);
             } else {
@@ -445,7 +433,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
                 L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
                 entry = NULL;
             }
-            rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
             if (rc < 0) {
                 pubsub_tcpHandler_freeEntry(entry);
                 L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
@@ -652,7 +640,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int si
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
         handle->maxMsgSize = size;
-        handle->maxMsgSize = 4;
+        handle->maxMsgSize = 16;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -767,6 +755,14 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
+void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool isEndPoint) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->isEndPoint = isEndPoint;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
 static inline
 int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) {
     int long nbytes = entry->expectedBufferReadSize;
@@ -775,8 +771,10 @@ int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection
         // Read the message header
         nbytes = recv(fd, &buffer[entry->bufferReadReadOffset], entry->expectedBufferReadSize, flag | MSG_NOSIGNAL);
         // Update buffer administration
-        entry->bufferReadReadOffset += nbytes;
-        entry->expectedBufferReadSize-= nbytes;
+        if (nbytes > 0) {
+            entry->bufferReadReadOffset += nbytes;
+            entry->expectedBufferReadSize -= nbytes;
+        }
     }
     return nbytes;
 }
@@ -789,9 +787,8 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
       handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
   }
   if (entry->header.header.metadataSize > 0) {
-      handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
-                                     entry->header.header.metadataSize, &entry->header);
-      entry->metaBufferSize = entry->header.header.metadataSize;
+      handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer,
+                                       entry->header.header.metadataSize, &entry->header);
   }
   if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
     struct timespec receiveTime;
@@ -809,12 +806,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
         entry->expectedBufferReadSize = entry->headerSize;
         entry->state = READ_STATE_HEADER;
       } else if (entry->state == READ_STATE_HEADER) {
-        if (entry->header.header.payloadSize) {
+        if (entry->header.header.payloadSize && entry->header.header.payloadPartSize) {
             entry->state = READ_STATE_PAYLOAD;
             entry->bufferReadReadOffset = entry->header.header.payloadOffset;
-            entry->expectedBufferReadSize = (entry->header.header.payloadPartSize) ? entry->header.header.payloadPartSize : entry->header.header.payloadSize;
+            entry->expectedBufferReadSize = entry->header.header.payloadPartSize;
             // For header less messages adjust offset and msg size;
-            if (!entry->headerBufferSize) {
+            if (!entry->readHeaderBufferSize) {
                 entry->bufferReadReadOffset += entry->headerSize;
                 entry->expectedBufferReadSize -= entry->headerSize;
             }
@@ -822,10 +819,10 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
             entry->state = READ_STATE_META;
             entry->expectedBufferReadSize = entry->header.header.metadataSize;
         } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize) {
-            if (entry->footerSize) {
+            if (entry->readFooterSize) {
                 entry->state = READ_STATE_FOOTER;
-                entry->expectedBufferReadSize = entry->footerSize;
-            } else if (entry->header.header.isLastSegment) {
+                entry->expectedBufferReadSize = entry->readFooterSize;
+            } else if (entry->header.header.isLastSegment == 0x01) {
                 entry->state = READ_STATE_READY;
                 entry->expectedBufferReadSize = 0;
             } else {
@@ -838,10 +835,10 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
             entry->state = READ_STATE_META;
             entry->expectedBufferReadSize = entry->header.header.metadataSize;
         } else  {
-            if (entry->footerSize) {
+            if (entry->readFooterSize) {
                 entry->state = READ_STATE_FOOTER;
-                entry->expectedBufferReadSize = entry->footerSize;
-            } else if (entry->header.header.isLastSegment) {
+                entry->expectedBufferReadSize = entry->readFooterSize;
+            } else if (entry->header.header.isLastSegment == 0x01) {
                 entry->state = READ_STATE_READY;
                 entry->expectedBufferReadSize = 0;
             } else {
@@ -850,10 +847,10 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
             }
         }
       } else if (entry->state == READ_STATE_META) {
-        if (entry->footerSize) {
+        if (entry->readFooterSize) {
             entry->state = READ_STATE_FOOTER;
-            entry->expectedBufferReadSize = entry->footerSize;
-        } else if (entry->header.header.isLastSegment) {
+            entry->expectedBufferReadSize = entry->readFooterSize;
+        } else if (entry->header.header.isLastSegment == 0x01) {
             entry->state = READ_STATE_READY;
             entry->expectedBufferReadSize = 0;
         } else {
@@ -861,7 +858,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
             entry->expectedBufferReadSize = entry->headerSize;
         }
       } else if (entry->state == READ_STATE_FOOTER) {
-        if (entry->header.header.isLastSegment) {
+        if (entry->header.header.isLastSegment == 0x01) {
             entry->state = READ_STATE_READY;
         } else {
             entry->state = READ_STATE_HEADER;
@@ -890,6 +887,170 @@ static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHand
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and size have valid values
 //
+int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
+    if (entry == NULL)
+        entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+    // Find FD entry
+    if (entry == NULL) {
+        celixThreadRwlock_unlock(&handle->dbLock);
+        return -1;
+    }
+    // If it's not connected return from function
+    if (!entry->connected) {
+        celixThreadRwlock_unlock(&handle->dbLock);
+        return -1;
+    }
+
+    if (entry->readHeaderBufferSize && entry->readHeaderBuffer)  entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize);
+
+    // Message buffer is to small, reallocate to make it bigger
+    if ((!entry->readHeaderBufferSize) && (entry->headerSize > entry->bufferSize)) {
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
+        if (buffer) {
+            entry->buffer = buffer;
+            entry->bufferSize = handle->bufferSize;
+        }
+    }
+    // Read the message
+    // Read the message
+    entry->msg.msg_iovlen = 0;
+    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer
+                                                                                   : entry->buffer;
+    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
+    entry->msg.msg_iovlen++;
+    int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
+    long int nbytes = UINT32_MAX;
+    char *header_buffer = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer;
+    if (entry->state == READ_STATE_SYNC) {
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+        nbytes = recv(fd, header_buffer, entry->expectedBufferReadSize, flag | MSG_NOSIGNAL);
+        if (nbytes && (entry->expectedBufferReadSize <= 0))  {
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
+        }
+    }
+    if (entry->state == READ_STATE_HEADER) {
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
+        if (nbytes && (entry->expectedBufferReadSize <= 0))  { // Check header message buffer
+            if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                               header_buffer,
+                                               entry->headerSize,
+                                               &entry->header) != CELIX_SUCCESS) {
+                // Did not receive correct header
+                // skip sync word and try to read next header
+                if (!entry->headerError) {
+                    L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+                }
+                entry->headerError = true;
+                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC);
+            } else {
+                // Read header message from queue
+                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+                nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+                if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                    entry->headerError = false;
+                    entry->msgSizeReadSize = 0;
+                    // For headerless message, add header to bufferReadSize;
+                    if (!entry->readHeaderBufferSize) entry->msgSizeReadSize += nbytes;
+                    pubsub_tcpHandler_setReadStateMachine(handle, entry);
+                }
+            }
+        }
+    }
+
+    if (entry->state == READ_STATE_PAYLOAD) {
+        // Alloc message buffers
+        if (entry->header.header.payloadSize > entry->bufferSize) {
+            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize + PADDING_BUFFER_SIZE);
+            if (entry->buffer) {
+                free(entry->buffer);
+            }
+            entry->buffer = malloc((size_t) handle->bufferSize);
+            entry->bufferSize = handle->bufferSize;
+        }
+
+        //if (entry->header.header.isLastSegment) entry->expectedBufferReadSize+=4;
+        // Read payload data from queue
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
+        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+            entry->msgSizeReadSize += nbytes;
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
+        }
+    }
+
+    if (entry->state == READ_STATE_META) {
+        if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
+            if (entry->readMetaBuffer) {
+                free(entry->readMetaBuffer);
+                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
+                       entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize);
+            }
+            entry->readMetaBufferSize = entry->header.header.metadataSize + PADDING_BUFFER_SIZE;
+            entry->readMetaBuffer     = malloc((size_t) entry->readMetaBufferSize);
+        }
+
+        // Read meta data from (queue
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readMetaBuffer,0);
+        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+            entry->msgSizeReadSize += nbytes;
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
+        }
+    }
+    if (entry->state == READ_STATE_FOOTER) {
+        // Check for end of message using, footer of message. Because of streaming protocol
+        if (!entry->readFooterBuffer) entry->readFooterBuffer = malloc(entry->readFooterSize);
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readFooterBuffer, 0);
+        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+            if (handle->protocol->decodeFooter(handle->protocol->handle,
+                                               entry->readFooterBuffer,
+                                               entry->readFooterSize,
+                                               &entry->header) == CELIX_SUCCESS) {
+                // valid footer, this means that the message is valid
+                pubsub_tcpHandler_setReadStateMachine(handle, entry);
+            } else {
+                // Did not receive correct footer
+                L_ERROR(
+                    "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)",
+                    entry->header.header.seqNr,
+                    entry->fd,
+                    entry->url);
+                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+            }
+        }
+    }
+    if (entry->state == READ_STATE_READY) {
+        // Complete message is received
+        pubsub_tcpHandler_decodePayload(handle, entry);
+        pubsub_tcpHandler_setReadStateMachine(handle, entry);
+    }
+
+    if (nbytes > 0) {
+        entry->retryCount = 0;
+    } else if (nbytes < 0) {
+        if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+            // Non blocking interrupt
+            entry->retryCount = 0;
+        } else if (entry->retryCount < handle->maxRcvRetryCount) {
+            entry->retryCount++;
+            L_WARN(
+                "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
+                entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+        } else {
+            L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
+                    entry->fd, handle->maxRcvRetryCount, strerror(errno));
+            nbytes = 0; //Return 0 as indicator to close the connection
+        }
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+    return (int)nbytes;
+}
+
+//
+// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
+// If the message is completely reassembled true is returned and the index and size have valid values
+//
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     celixThreadRwlock_writeLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
@@ -906,8 +1067,10 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
         return -1;
     }
 
+    if (entry->readHeaderBufferSize && entry->readHeaderBuffer)  entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize);
+
     // Message buffer is to small, reallocate to make it bigger
-    if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
+    if ((!entry->readHeaderBufferSize) && (entry->headerSize > entry->bufferSize)) {
         handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
         if (entry->buffer)
             free(entry->buffer);
@@ -916,16 +1079,16 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     }
     // Read the message
     long int nbytes = UINT32_MAX;
-    char *header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    char *header_buffer = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer;
     if (entry->state == READ_STATE_SYNC) {
         nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
-        if (nbytes > 0) {
+        if (nbytes && (entry->expectedBufferReadSize <= 0))  {
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
     }
     if (entry->state == READ_STATE_HEADER) {
         nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
-        if (nbytes >= entry->headerSize) { // Check header message buffer
+        if (nbytes && (entry->expectedBufferReadSize <= 0))  { // Check header message buffer
             if (handle->protocol->decodeHeader(handle->protocol->handle,
                                                header_buffer,
                                                entry->headerSize,
@@ -945,8 +1108,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
                     entry->headerError = false;
                     entry->msgSizeReadSize = 0;
                     // For headerless message, add header to bufferReadSize;
-                    if (!entry->headerBufferSize)
-                        entry->msgSizeReadSize += nbytes;
+                    if (!entry->readHeaderBufferSize) entry->msgSizeReadSize += nbytes;
                     pubsub_tcpHandler_setReadStateMachine(handle, entry);
                 }
             }
@@ -956,51 +1118,61 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     if (entry->state == READ_STATE_PAYLOAD) {
         // Alloc message buffers
         if (entry->header.header.payloadSize > entry->bufferSize) {
-            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-            if (entry->buffer)
+            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize + PADDING_BUFFER_SIZE);
+            if (entry->buffer) {
                 free(entry->buffer);
+            }
             entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
         }
 
+        //if (entry->header.header.isLastSegment) entry->expectedBufferReadSize+=4;
         // Read payload data from queue
         nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
-        if (nbytes > 0) {
-            if (nbytes >= entry->header.header.payloadPartSize) {
-                entry->msgSizeReadSize += nbytes;
-                pubsub_tcpHandler_setReadStateMachine(handle, entry);
-            }
+        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+            entry->msgSizeReadSize += nbytes;
+            pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
     }
 
     if (entry->state == READ_STATE_META) {
-        if (entry->header.header.metadataSize > entry->metaBufferSize) {
-            if (entry->metaBuffer) {
-                free(entry->metaBuffer);
-                entry->metaBuffer     = malloc((size_t) entry->header.header.metadataSize);
-                entry->metaBufferSize = entry->header.header.metadataSize;
+        if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
+            if (entry->readMetaBuffer) {
+                free(entry->readMetaBuffer);
                 L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
-                       entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+                       entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize);
             }
+            entry->readMetaBufferSize = entry->header.header.metadataSize + PADDING_BUFFER_SIZE;
+            entry->readMetaBuffer     = malloc((size_t) entry->readMetaBufferSize);
         }
 
         // Read meta data from (queue
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0);
-        if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) {
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readMetaBuffer,0);
+        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
             entry->msgSizeReadSize += nbytes;
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
     }
     if (entry->state == READ_STATE_FOOTER) {
         // Check for end of message using, footer of message. Because of streaming protocol
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0);
-        if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
-            // valid footer, this means that the message is valid
-            pubsub_tcpHandler_setReadStateMachine(handle, entry);
-        } else {
-            // Did not receive correct header
-            L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
-            pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+        if (!entry->readFooterBuffer) entry->readFooterBuffer = malloc(entry->readFooterSize);
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readFooterBuffer, 0);
+        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+            if (handle->protocol->decodeFooter(handle->protocol->handle,
+                                               entry->readFooterBuffer,
+                                               entry->readFooterSize,
+                                               &entry->header) == CELIX_SUCCESS) {
+                // valid footer, this means that the message is valid
+                pubsub_tcpHandler_setReadStateMachine(handle, entry);
+            } else {
+                // Did not receive correct footer
+                L_ERROR(
+                    "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)",
+                    entry->header.header.seqNr,
+                    entry->fd,
+                    entry->url);
+                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+            }
         }
     }
     if (entry->state == READ_STATE_READY) {
@@ -1114,7 +1286,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
     int nofConnToClose = 0;
     if (handle) {
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
-        size_t max_msg_iov_len = IOV_MAX - 2;
+        size_t max_msg_iov_len = IOV_MAX - 3; // header , footer, padding
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
             if (!entry->connected) continue;
@@ -1136,11 +1308,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
                 handle->protocol->encodeMetadata(handle->protocol->handle, message,
                                                  &metadataData,
                                                  &metadataSize);
-                entry->metaBufferSize = metadataSize;
+                entry->writeMetaBufferSize = MAX(metadataSize, entry->writeMetaBufferSize);
+                if (metadataData && entry->writeMetaBuffer != metadataData) entry->writeMetaBuffer = metadataData;
             }
             message->header.metadataSize = metadataSize;
             size_t totalMessageSize = payloadSize + metadataSize;
@@ -1156,15 +1330,19 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
 
             void *footerData = NULL;
             size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
+            if (entry->writeFooterSize) {
+                footerDataSize = entry->writeFooterSize;
+                footerData = entry->writeFooterBuffer;
                 handle->protocol->encodeFooter(handle->protocol->handle, message,
                                                  &footerData,
                                                  &footerDataSize);
-                entry->footerSize = footerDataSize;
+                entry->writeFooterSize = MAX(footerDataSize, entry->writeFooterSize);
+                if (footerData && entry->writeFooterBuffer != footerData) entry->writeFooterBuffer = footerData;
             }
 
             size_t msgSize = 0;
+            size_t msgPayloadSize = 0;
+            size_t msgMetaDataSize = 0;
             size_t msgIovLen = 0;
             long int nbytes = UINT32_MAX;
             while (msgSize < totalMessageSize && nbytes > 0) {
@@ -1176,22 +1354,26 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 msg.msg_flags = flags;
                 msg.msg_iov = msg_iov;
                 size_t msgPartSize = 0;
+                size_t msgMetaDataPartSize = 0;
                 message->header.payloadPartSize = 0;
                 message->header.payloadOffset = 0;
                 message->header.metadataSize = 0;
+                message->header.metadataPartSize = 0;
+                message->header.metadataOffset = 0;
                 message->header.isLastSegment = 0;
 
                 // Write generic seralized payload in vector buffer
-                if (msgSize < payloadSize) {
+                if (msgPayloadSize < payloadSize) {
                     if (payloadSize && payloadData) {
-                        char *payloadDataBuffer = payloadData;
+                        char *buffer = payloadData;
                         msg.msg_iovlen++;
-                        msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize];
-                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize);
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadSize];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadSize), entry->maxMsgSize);
                         msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                         message->header.payloadPartSize = msgPartSize;
-                        message->header.payloadOffset = msgSize;
-                        msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        message->header.payloadOffset = msgPayloadSize;
+                        msgPayloadSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msgSize = msgPayloadSize;
                     } else {
                         // copy serialized vector into vector buffer
                         for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) {
@@ -1202,23 +1384,42 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                                 break;
                             msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                         }
+                        message->header.payloadOffset = msgPayloadSize;
                         message->header.payloadPartSize = msgPartSize;
-                        message->header.payloadOffset = msgSize;
-                        msgSize += msgPartSize;
+                        msgPayloadSize  += message->header.payloadPartSize;
+                        msgSize = msgPayloadSize;
                         msgIovLen += (msg.msg_iovlen - 1);
                     }
+                    // iov structures are aligned to 4 bytes.
+                    // Add padding to read correct footer.
+                    //unsigned int padding = msgPartSize & ( 0x4-1 );
+                    //if (padding) {
+                    //    msg.msg_iovlen++;
+                    //    msg.msg_iov[msg.msg_iovlen].iov_len  = 0x4 - padding;
+                    //    msg.msg_iov[msg.msg_iovlen].iov_base = entry->writePaddingBuffer;
+                    //    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    //    message->header.payloadPartSize= msgPartSize;
+                   // }
                 }
 
                 // Write optional metadata in vector buffer
-                if ((msgSize >= payloadSize) &&
+                if ((msgPayloadSize >= payloadSize) &&
+                    (msgMetaDataSize < metadataSize) &&
                     (msgPartSize < entry->maxMsgSize) &&
-                    (metadataSize && metadataData)) {
-                    msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-                    message->header.metadataSize = metadataSize;
-                    msgSize += metadataSize;
+                    (msg.msg_iovlen < max_msg_iov_len)) {
+                    if (metadataSize && metadataData) {
+                        char *buffer = payloadData;
+                        msg.msg_iovlen++;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgMetaDataSize];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((metadataSize - msgMetaDataSize), entry->maxMsgSize);
+                        msgMetaDataPartSize = msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msgPartSize += msgMetaDataPartSize;
+                        message->header.metadataSize = metadataSize;
+                        message->header.metadataPartSize = msgMetaDataPartSize;
+                        message->header.metadataOffset = msgMetaDataSize;
+                        msgMetaDataSize += msgMetaDataPartSize;
+                        msgSize += msgMetaDataPartSize;
+                    }
                 }
                 if (msgSize >= totalMessageSize) {
                     message->header.isLastSegment = 0x1;
@@ -1233,17 +1434,19 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 }
 
                 void *headerData = NULL;
-                size_t headerSize = 0;
+                size_t headerSize = entry->writeHeaderBufferSize;
                 // check if header is not part of the payload (=> headerBufferSize = 0)s
-                if (entry->headerBufferSize) {
-                    headerData = entry->headerBuffer;
+                if (entry->writeHeaderBufferSize) {
+                    headerSize = entry->writeHeaderBufferSize;
+                    headerData = entry->writeHeaderBuffer;
                     // Encode the header, with payload size and metadata size
                     handle->protocol->encodeHeader(handle->protocol->handle, message,
                                                    &headerData,
                                                    &headerSize);
-                    entry->headerBufferSize = headerSize;
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData) entry->writeHeaderBuffer = headerData;
                 }
-                if (!entry->headerBufferSize) {
+                if (!entry->writeHeaderBufferSize) {
                     // Skip header buffer, when header is part of payload;
                     msg.msg_iov = &msg_iov[1];
                 } else if (headerSize && headerData) {
@@ -1256,7 +1459,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     L_ERROR("[TCP Socket] No header buffer is generated");
                     msg.msg_iovlen = 0;
                 }
-                nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags);
+                //nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags);
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
                 //  When a specific socket keeps reporting errors can indicate a subscriber
                 //  which is not active anymore, the connection will remain until the retry
                 //  counter exceeds the maximum retry count.
@@ -1279,20 +1483,10 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                         L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
                     }
                 }
-                // Release data
-                if (headerData && headerData != entry->headerBuffer) {
-                    free(headerData);
-                }
                 // Note: serialized Payload is deleted by serializer
                 if (payloadData && (payloadData != message->payload.payload)) {
                     free(payloadData);
                 }
-                if (metadataData && metadataData != entry->metaBuffer) {
-                    free(metadataData);
-                }
-                if (footerData && footerData != entry->footerBuffer) {
-                    free(footerData);
-                }
             }
         }
     }
@@ -1357,7 +1551,8 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
 #else
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+        event.events = EPOLLRDHUP | EPOLLERR;
+        if (handle->isEndPoint) event.events |= EPOLLIN;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index bb74387..a08911c 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -65,6 +65,7 @@ void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int
 void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool isEndPoint);
 
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 7ae65d1..218b47b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -208,6 +208,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
         pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
         pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        pubsub_tcpHandler_setEndPoint(sender->socketHandler, isEndpoint);
     }
 
     //setting up tcp socket for TCP TopicSender
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 1e95c0b..8c2c573 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -601,6 +601,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co
                 message.header.metadataSize = entry->metadataBufferSize;
                 message.header.payloadPartSize = payloadLength;
                 message.header.payloadOffset = 0;
+                message.header.metadataPartSize = entry->metadataBufferSize;
+                message.header.metadataOffset = 0;
                 message.header.isLastSegment = 1;
                 message.header.convertEndianess = 0;
 
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
index 1d123d7..eb3755b 100644
--- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c
@@ -192,7 +192,11 @@ celix_status_t pubsubProtocol_encodePayload(pubsub_protocol_message_t *message,
 
 celix_status_t pubsubProtocol_encodeMetadata(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) {
     celix_status_t status = CELIX_SUCCESS;
-
+    if (outLength == NULL) {
+        status = CELIX_ILLEGAL_ARGUMENT;
+        return status;
+    }
+    *outLength = (*outLength == 0) ? 1024 : *outLength;
     size_t lineMemoryLength = *outBuffer == NULL ? 1024 : *outLength;
     unsigned char *line = *outBuffer == NULL ? calloc(1, lineMemoryLength) : *outBuffer;
     size_t idx = 4;
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
index a05e095..f87cd7c 100644
--- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c
@@ -46,7 +46,7 @@ celix_status_t pubsubProtocol_wire_v2_destroy(pubsub_protocol_wire_v2_t* protoco
 }
 
 celix_status_t pubsubProtocol_wire_v2_getHeaderSize(void* handle, size_t *length) {
-    *length = sizeof(int) * 9 + sizeof(short) * 2; // header + sync + version = 36
+    *length = sizeof(int) * 11 + sizeof(short) * 2; // header + sync + version = 48
     return CELIX_SUCCESS;
 }
 
@@ -99,6 +99,8 @@ celix_status_t pubsubProtocol_wire_v2_encodeHeader(void *handle, pubsub_protocol
         idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataSize);
         idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadPartSize);
         idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadOffset);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataPartSize);
+        idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataOffset);
         idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.isLastSegment);
         *outLength = idx;
     }
@@ -160,6 +162,8 @@ celix_status_t pubsubProtocol_wire_v2_decodeHeader(void* handle, void *data, siz
                 idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataSize);
                 idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadPartSize);
                 idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadOffset);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataPartSize);
+                idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataOffset);
                 pubsubProtocol_readInt(data, idx, convert, &message->header.isLastSegment);
             }
         }
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
index ad1a387..2c23b6f 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h
@@ -47,11 +47,13 @@ struct pubsub_protocol_header {
      *  Note: this attribute is transmitted using the wire protocol, the sync word is used to determine endianess conversion */
     uint32_t convertEndianess;
 
-    /** Optional message segmentation attributes, these attributes are only used/written by the protocol admin.
+    /** pptional message segmentation attributes, these attributes are only used/written by the protocol admin.
      *  When message segmentation is supported by the protocol admin */
     uint32_t seqNr;
     uint32_t payloadPartSize;
     uint32_t payloadOffset;
+    uint32_t metadataPartSize;
+    uint32_t metadataOffset;
     uint32_t isLastSegment;
 };
 


Mime
View raw message