celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [celix] branch develop updated: Fixed TCP transmissions (#50)
Date Mon, 16 Sep 2019 20:58:47 GMT
This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new ec12aaa  Fixed TCP transmissions (#50)
ec12aaa is described below

commit ec12aaa78439b5a1b7f316b6955041e4cbd0ad8e
Author: rbulter <roybulter@gmail.com>
AuthorDate: Mon Sep 16 22:58:42 2019 +0200

    Fixed TCP transmissions (#50)
    
    * Fix TCP high load traffic
    * Don't remove connection
    * Remove epoll event EPOLLET
    * Remove test
    * Refactor read handler function
    * Reduce processing Load
---
 CMakeLists.txt                                     |   6 +-
 .../src/pubsub_psa_tcp_constants.h                 |  21 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 677 +++++++++++++--------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   5 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_msg_header.h   |  23 +-
 .../src/pubsub_tcp_topic_receiver.c                |  13 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  24 +-
 7 files changed, 476 insertions(+), 293 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2bac90d..76bf074 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -38,8 +38,10 @@ set(ENABLE_MORE_WARNINGS OFF)
 IF (ANDROID)
     set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -Wall ${CMAKE_C_FLAGS}")
 ELSE ()
-    set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -Wall -Werror -fPIC ${CMAKE_C_FLAGS}")
-    set(CMAKE_CXX_FLAGS "-std=c++11 -Wall -Wextra -Weffc++ -fno-rtti -fno-exceptions ${CMAKE_CXX_FLAGS}")
+    set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -fPIC ${CMAKE_C_FLAGS}")
+    set(CMAKE_CXX_FLAGS "-std=c++11 -fno-rtti -fno-exceptions ${CMAKE_CXX_FLAGS}")
+    set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
+    set(CMAKE_CXX_FLAGS "-Wall -Wextra -Weffc++ ${CMAKE_CXX_FLAGS}")
     set(CMAKE_C_FLAGS_DEBUG "-g -DDEBUG ${CMAKE_C_FLAGS}")
     set(CMAKE_CXX_FLAGS_DEBUG "-g -DDEBUG ${CMAKE_CXX_FLAGS}")
 ENDIF()
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 42ad2fa..a6bfc81 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -30,9 +30,9 @@
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       16
-#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        6500000
-#define PSA_TCP_DEFAULT_TIMEOUT                 500
+#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
+#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        1024
+#define PSA_TCP_DEFAULT_TIMEOUT                 2000
 
 #define PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE        30
 #define PSA_TCP_DEFAULT_QOS_CONTROL_SCORE       70
@@ -93,6 +93,21 @@
 #define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER  "server"
 #define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT  "client"
 
+
+/**
+ * The TCP admin supports send message without pubsub header.
+ * In this case a message_id must be part of the message to be able to distinguish
+ * the different messages. The location of the message ID is configured with PUBSUB_TCP_MESSAGE_ID_OFFSET.
+ * The size of the message ID in bytes is specified with PUBSUB_TCP_MESSAGE_ID_SIZE
+ * The properties can be set in the topic properties.
+ */
+#define PUBSUB_TCP_BYPASS_HEADER          "tcp.static.bypass.header"
+#define PUBSUB_TCP_DEFAULT_BYPASS_HEADER  false
+#define PUBSUB_TCP_MESSAGE_ID_OFFSET      "tcp.static.message_id.offset"
+#define PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET 0
+#define PUBSUB_TCP_MESSAGE_ID_SIZE        "tcp.static.message_id.size"
+#define PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE   4
+
 /**
  * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
  * internal TCP threads.
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index ef9ad60..42a7061 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -46,7 +46,14 @@
 #define IP_HEADER_SIZE  20
 #define TCP_HEADER_SIZE 20
 #define MAX_EPOLL_EVENTS   64
-#define MAX_MSG_VECTOR_LEN 256
+#define MAX_MSG_VECTOR_LEN 64
+#define MAX_DEFAULT_BUFFER_SIZE 4u
+
+#define READ_STATE_INIT   0u
+#define READ_STATE_HEADER 1u
+#define READ_STATE_DATA   2u
+#define READ_STATE_READY  3u
+#define READ_STATE_FIND_HEADER 4u
 
 #define L_DEBUG(...) \
     logHelper_log(handle->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -57,47 +64,52 @@
 #define L_ERROR(...) \
     logHelper_log(handle->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
+
+typedef struct psa_tcp_connection_entry {
+    char *url;
+    int fd;
+    struct sockaddr_in addr;
+    socklen_t len;
+    bool connected;
+    unsigned int bufferSize;
+    char *buffer;
+    int bufferReadSize;
+    int expectedReadSize;
+    int readState;
+    pubsub_tcp_msg_header_t header;
+
+} psa_tcp_connection_entry_t;
+
 struct pubsub_tcpHandler {
-  array_list_pt bufferLists;
-  unsigned int bufferIdx;
   unsigned int readSeqNr;
-  unsigned int writeSeqNr;
   unsigned int msgIdOffset;
+  unsigned int msgIdSize;
   bool bypassHeader;
+  bool useBlocking;
   celix_thread_rwlock_t dbLock;
   unsigned int timeout;
   hash_map_t *url_map;
   hash_map_t *fd_map;
   int efd;
-  int fd;
-  char *url;
   pubsub_tcpHandler_connectMessage_callback_t connectMessageCallback;
   pubsub_tcpHandler_connectMessage_callback_t disconnectMessageCallback;
   void *connectPayload;
   pubsub_tcpHandler_processMessage_callback_t processMessageCallback;
   void *processMessagePayload;
   log_helper_t *logHelper;
-  unsigned int default_bufferSize;
-  unsigned int default_buffer;
-};
-
-typedef struct pubsub_tcpBufferPartList {
-  pubsub_tcp_msg_header_t default_header;
   unsigned int bufferSize;
-  char *buffer;
-} pubsub_tcpBufferPartList_t;
-
+  unsigned int maxNofBuffer;
+  psa_tcp_connection_entry_t own;
+};
 
-typedef struct psa_tcp_connection_entry {
-  char *url;
-  int fd;
-  struct sockaddr_in addr;
-  socklen_t len;
-} psa_tcp_connection_entry_t;
 
 static inline int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle, const char *hostname, int port, struct sockaddr_in *inp);
 static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
 static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle, int fd);
+static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
+//static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd);
+static inline void pubsub_tcpHandler_setupEntry(psa_tcp_connection_entry_t* entry, int fd, char *url, unsigned int bufferSize);
+static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t* entry);
 
 
 //
@@ -106,15 +118,17 @@ static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle,
 pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper) {
     pubsub_tcpHandler_t *handle = calloc(sizeof(*handle), 1);
     if (handle != NULL) {
-        handle->fd = -1;
         handle->efd = epoll_create1(0);
-        handle->bufferIdx = 0;
         handle->url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
         handle->fd_map = hashMap_create(NULL, NULL, NULL, NULL);
-        handle->timeout = 500;
+        handle->timeout = 2000; // default 2 sec
         handle->logHelper = logHelper;
-        handle->default_bufferSize =sizeof(handle->default_buffer);
-        handle->default_buffer = 0;
+        handle->msgIdOffset = 0;
+        handle->msgIdSize = 4;
+        handle->bypassHeader = false;
+        handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
+        handle->maxNofBuffer = 1; // Reserved for future Use;
+        pubsub_tcpHandler_setupEntry(&handle->own, -1, NULL, MAX_DEFAULT_BUFFER_SIZE);
         celixThreadRwlock_create(&handle->dbLock, 0);
         //signal(SIGPIPE, SIG_IGN);
     }
@@ -139,26 +153,9 @@ void pubsub_tcpHandler_destroy(pubsub_tcpHandler_t *handle) {
         }
 
         if (handle->efd >= 0) close(handle->efd);
-        if (handle->url) free(handle->url);
+        pubsub_tcpHandler_freeEntry(&handle->own);
         hashMap_destroy(handle->url_map, false, false);
         hashMap_destroy(handle->fd_map, false, false);
-
-        if (handle->bufferLists != NULL) {
-            int listSize = arrayList_size(handle->bufferLists);
-            int i;
-            for (i = 0; i < listSize; i++) {
-                pubsub_tcpBufferPartList_t *item = arrayList_get(handle->bufferLists, i);
-                if (item) {
-                    if (item->buffer) {
-                        free(item->buffer);
-                        item->buffer = NULL;
-                    }
-                    free(item);
-                }
-            }
-            arrayList_destroy(handle->bufferLists);
-        }
-        handle->bufferLists = NULL;
         celixThreadRwlock_unlock(&handle->dbLock);
         celixThreadRwlock_destroy(&handle->dbLock);
         free(handle);
@@ -213,83 +210,119 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle) {
     int rc = 0;
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
-        if ((handle->efd >= 0) && (handle->fd >= 0)) {
+        if ((handle->efd >= 0) && (handle->own.fd >= 0)) {
             struct epoll_event event;
             bzero(&event, sizeof(struct epoll_event)); // zero the struct
-            rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, handle->fd, &event);
+            rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, handle->own.fd, &event);
             if (rc < 0) {
                 L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
             }
         }
-        if (handle->url) {
-            free(handle->url);
-            handle->url = NULL;
-        }
-        if (handle->fd >= 0) {
-            close(handle->fd);
-            handle->fd = -1;
-        }
+        pubsub_tcpHandler_freeEntry(&handle->own);
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return rc;
 }
 
+static inline
+void pubsub_tcpHandler_setupEntry(psa_tcp_connection_entry_t* entry, int fd, char *url, unsigned int bufferSize) {
+    entry->fd = fd;
+    if  (url) entry->url = strndup(url, 1024 * 1024);
+    if ((bufferSize > entry->bufferSize)&&(bufferSize)) {
+        entry->bufferSize = bufferSize;
+        if (entry->buffer) free(entry->buffer);
+        entry->buffer = calloc(sizeof(char), entry->bufferSize);
+    }
+    entry->connected = true;
+}
+
+static inline
+void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t* entry) {
+    if (entry->url) {
+        free(entry->url);
+        entry->url = NULL;
+    }
+    if (entry->fd >= 0) {
+        close(entry->fd);
+        entry->fd = -1;
+    }
+    if (entry->buffer) {
+        free(entry->buffer);
+        entry->buffer = NULL;
+        entry->bufferSize = 0;
+    }
+    entry->connected = false;
+}
+
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
-    pubsub_tcpHandler_url_t url_info;
-    pubsub_tcpHandler_setUrlInfo(url, &url_info);
-    psa_tcp_connection_entry_t *entry = NULL;
-    int fd = pubsub_tcpHandler_open(handle, NULL);
-    celixThreadRwlock_writeLock(&handle->dbLock);
-    int rc = fd;
-    struct sockaddr_in addr; // connector's address information
-    if (rc >= 0) {
-        rc = pubsub_tcpHandler_setInAddr(handle, url_info.hostname, url_info.portnr, &addr);
-        if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot create url\n");
-            close(fd);
+    int rc = 0;
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->url_map, (void *) (intptr_t) url);
+    if (entry == NULL) {
+        pubsub_tcpHandler_url_t url_info;
+        pubsub_tcpHandler_setUrlInfo(url, &url_info);
+        int fd = pubsub_tcpHandler_open(handle, NULL);
+        rc = fd;
+        struct sockaddr_in addr; // connector's address information
+        if (rc >= 0) {
+            rc = pubsub_tcpHandler_setInAddr(handle, url_info.hostname, url_info.portnr, &addr);
+            if (rc < 0) {
+                L_ERROR("[TCP Socket] Cannot create url\n");
+                close(fd);
+            }
         }
-    }
-    if (rc >= 0) {
-        rc = connect(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
-        if (rc < 0) {
-            //L_ERROR("[TCP Socket] Cannot connect to %s:%d\n", url_info.hostname, url_info.portnr);
-            close(fd);
-            fd = -1;
-        } else {
-            struct sockaddr_in sin;
-            socklen_t len = sizeof(sin);
-            entry = calloc(1, sizeof(*entry));
-            entry->url = strndup(url, 1024 * 1024);
-            entry->fd = fd;
-            if (getsockname(fd, (struct sockaddr *) &sin, &len) < 0) {
-                L_ERROR("[TCP Socket] getsockname %s\n", strerror(errno));
-            } else if (handle->url == NULL) {
-                char *address = inet_ntoa(sin.sin_addr);
-                unsigned int port = ntohs(sin.sin_port);
-                asprintf(&handle->url, "tcp://%s:%u", address, port);
+        // Make file descriptor NonBlocking
+        if ((!handle->useBlocking) && (rc >= 0)) {
+            rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+            if (rc < 0) close(fd);
+        }
+        // Connect to sender
+        if (rc >= 0) {
+            rc = connect(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
+            if (rc < 0 && errno != EINPROGRESS) {
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: err: %s\n", url_info.hostname, url_info.portnr,
+                        strerror(errno));
+                close(fd);
+                errno = 0;
+            } else {
+                struct sockaddr_in sin;
+                socklen_t len = sizeof(sin);
+                entry = calloc(1, sizeof(*entry));
+                pubsub_tcpHandler_setupEntry(entry, fd, url, handle->bufferSize);
+                entry->connected = false; // Wait till epoll event, to report connected.
+                rc = getsockname(fd, (struct sockaddr *) &sin, &len);
+                if (rc < 0) {
+                    L_ERROR("[TCP Socket] getsockname %s\n", strerror(errno));
+                    errno = 0;
+                } else if (handle->own.url == NULL) {
+                    char *address = inet_ntoa(sin.sin_addr);
+                    unsigned int port = ntohs(sin.sin_port);
+                    asprintf(&handle->own.url, "tcp://%s:%u", address, port);
+                }
             }
         }
-    }
-    if (rc >= 0) {
-        struct epoll_event event;
-        bzero(&event, sizeof(struct epoll_event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
-        event.data.fd = entry->fd;
-        rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
-        if (rc < 0) {
-            close(entry->fd);
-            free(entry->url);
-            free(entry);
-            L_ERROR("[TCP Socket] Cannot create epoll\n");
+        // Subscribe File Descriptor to epoll
+        if ((rc >= 0) && (entry)) {
+            struct epoll_event event;
+            bzero(&event, sizeof(struct epoll_event)); // zero the struct
+            event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+            event.data.fd = entry->fd;
+            rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
+            if (rc < 0) {
+                pubsub_tcpHandler_freeEntry(entry);
+                free(entry);
+                L_ERROR("[TCP Socket] Cannot create epoll %s\n", strerror(errno));
+                errno = 0;
+                entry = NULL;
+            }
         }
+        if ((rc >= 0) && (entry)) {
+            celixThreadRwlock_writeLock(&handle->dbLock);
+            hashMap_put(handle->url_map, entry->url, entry);
+            hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
+            celixThreadRwlock_unlock(&handle->dbLock);
+        }
+        pubsub_tcpHandler_free_setUrlInfo(&url_info);
     }
-    if ((rc >= 0) && (entry)) {
-        if (handle->connectMessageCallback) handle->connectMessageCallback(handle->connectPayload, entry->url, false);
-        hashMap_put(handle->url_map, entry->url, entry);
-        hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
-    }
-    pubsub_tcpHandler_free_setUrlInfo(&url_info);
-    celixThreadRwlock_unlock(&handle->dbLock);
     return rc;
 }
 
@@ -320,14 +353,13 @@ static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *ha
             rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event);
             if (rc < 0) {
                 L_ERROR("[PSA TCP] Error disconnecting %s\n", strerror(errno));
+                errno = 0;
             }
         }
         if (entry->fd >= 0) {
             if (handle->disconnectMessageCallback)
                 handle->disconnectMessageCallback(handle->connectPayload, entry->url, lock);
-            close(entry->fd);
-            free(entry->url);
-            entry->url = NULL;
+            pubsub_tcpHandler_freeEntry(entry);
             free(entry);
         }
     }
@@ -343,7 +375,7 @@ static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle,
         bool use_handle_fd = false;
         psa_tcp_connection_entry_t *entry = NULL;
         celixThreadRwlock_readLock(&handle->dbLock);
-        if (fd != handle->fd) {
+        if (fd != handle->own.fd) {
             entry = hashMap_get(handle->fd_map, (void *) (intptr_t) fd);
         } else {
             use_handle_fd = true;
@@ -361,45 +393,68 @@ static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle,
     return rc;
 }
 
+static inline
+int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) {
+  int rc = 0;
+  int flags = fcntl(fd, F_GETFL, 0);
+  if (flags == -1) rc = flags;
+  else {
+    rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+    if (rc < 0) {
+      L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+      errno = 0;
+    }
+  }
+  return rc;
+}
+
+#ifdef USE_BLOCKING
+static inline
+int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd) {
+    int rc = 0;
+    int flags = fcntl(fd, F_GETFL, 0);
+    if (flags == -1) rc = flags;
+    else {
+        rc = fcntl(fd, F_SETFL, flags & (~O_NONBLOCK));
+        if (rc < 0) {
+            L_ERROR("[TCP Socket] Cannot set to BLOCKING epoll: %s\n", strerror(errno));
+            errno = 0;
+        }
+    }
+    return rc;
+}
+#endif
+
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
-    handle->fd = pubsub_tcpHandler_open(handle, url);
-    handle->url = strndup(url, 1024 * 1024);
-    int rc = handle->fd;
+    int fd = pubsub_tcpHandler_open(handle, url);
+    // Make handler fd entry
+    pubsub_tcpHandler_setupEntry(&handle->own, fd, url, MAX_DEFAULT_BUFFER_SIZE);
+    int rc = fd;
     celixThreadRwlock_writeLock(&handle->dbLock);
     if (rc >= 0) {
-        rc = listen(handle->fd, SOMAXCONN);
+        rc = listen(fd, SOMAXCONN);
         if (rc != 0) {
             L_ERROR("[TCP Socket] Error listen: %s\n", strerror(errno));
-            close(handle->fd);
-            handle->fd = -1;
-            free(handle->url);
-            handle->url = NULL;
+            pubsub_tcpHandler_freeEntry(&handle->own);
+            errno = 0;
         }
     }
     if (rc >= 0) {
-        int flags = fcntl(handle->fd, F_GETFL, 0);
-        if (flags == -1) {
-            rc = flags;
-        } else {
-            rc = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
-            if (rc < 0) {
-                L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll\n");
-                close(handle->fd);
-                handle->fd = -1;
-                free(handle->url);
-                handle->url = NULL;
-            }
+        rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+        if (rc < 0) {
+            pubsub_tcpHandler_freeEntry(&handle->own);
         }
     }
 
     if ((rc >= 0) && (handle->efd >= 0)) {
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
-        event.data.fd = handle->fd;
-        rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, handle->fd, &event);
+        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+        event.data.fd = fd;
+        rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, fd, &event);
         if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot create epoll\n");
+            L_ERROR("[TCP Socket] Cannot create epoll: %s\n",strerror(errno));
+            errno = 0;
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
@@ -416,7 +471,8 @@ int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle, const char *hostnam
         if (!inet_aton(hostname, &inp->sin_addr)) {
             hp = gethostbyname(hostname);
             if (hp == NULL) {
-                L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s\n", hostname);
+                L_ERROR("[TCP Socket] set_in_addr: Unknown host name %s, %s\n", hostname, strerror(errno));
+                errno = 0;
                 return -1;
             }
             inp->sin_addr = *(struct in_addr *) hp->h_addr;
@@ -451,7 +507,7 @@ void pubsub_tcpHandler_setUrlInfo(char *url, pubsub_tcpHandler_url_t *url_info)
             url_info->hostname = strtok(strdup(hostname), ":");
             if (port) {
                 port += 1;
-                unsigned int portDigits = (unsigned int) atoi(port);
+                unsigned int portDigits = (unsigned) atoi(port);
                 if (portDigits != 0) url_info->portnr = portDigits;
             }
             free(hostname);
@@ -469,20 +525,13 @@ void pubsub_tcpHandler_free_setUrlInfo(pubsub_tcpHandler_url_t *url_info) {
 }
 
 
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, unsigned int maxNofBuffers,
+int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
+                                               unsigned int maxNofBuffers __attribute__ ((__unused__)),
                                                unsigned int bufferSize) {
     if (handle != NULL) {
-        int i = 0;
         celixThreadRwlock_writeLock(&handle->dbLock);
-        if (arrayList_create(&handle->bufferLists) != CELIX_SUCCESS) {
-            return -1;
-        }
-        for (i = 0; i < maxNofBuffers; i++) {
-            pubsub_tcpBufferPartList_t *item = calloc(1, sizeof(struct pubsub_tcpBufferPartList));
-            item->buffer = calloc(sizeof(char), bufferSize);
-            item->bufferSize = bufferSize;
-            arrayList_add(handle->bufferLists, item);
-        }
+        handle->bufferSize = bufferSize;
+        handle->maxNofBuffer = maxNofBuffers;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -496,6 +545,15 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int time
     }
 }
 
+void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned int msgIdOffset, unsigned int msgIdSize) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->bypassHeader = bypassHeader;
+        handle->msgIdOffset  = msgIdOffset;
+        handle->msgIdSize  = msgIdSize;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
 
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
@@ -503,43 +561,103 @@ void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int time
 //
 int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size) {
     celixThreadRwlock_writeLock(&handle->dbLock);
-    if (handle->bufferLists == NULL) {
-        int nbytes = recv(fd, &handle->default_buffer, handle->default_bufferSize, MSG_PEEK);
+    psa_tcp_connection_entry_t *entry = NULL;
+    if (fd == handle->own.fd) entry = &handle->own;
+    else entry = hashMap_get(handle->fd_map, (void *) (intptr_t) fd);
+    // Find FD entry
+    if (entry == NULL) {
         celixThreadRwlock_unlock(&handle->dbLock);
-        return nbytes;
+        return -1;
     }
-    int listSize = arrayList_size(handle->bufferLists);
-    pubsub_tcpBufferPartList_t *item = arrayList_get(handle->bufferLists, handle->bufferIdx);
-    if (!handle->bypassHeader) {
-        // Only read the header, we don't know yet where to store the payload
-        int nbytes = recv(fd, item->buffer, sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int), MSG_PEEK);
-        if (nbytes < 0) {
-            L_ERROR("[TCP Socket] read error \n");
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-        }
-        unsigned int *pBuffer_size = ((unsigned int *) &item->buffer[sizeof(pubsub_tcp_msg_header_t)]);
-        unsigned int buffer_size = *pBuffer_size + sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int);
-        if (buffer_size > item->bufferSize) {
-            free(item->buffer);
-            item->buffer = calloc(buffer_size, sizeof(char));
-            item->bufferSize = buffer_size;
+    // If it's not connected return from function
+    if (!entry->connected) {
+        celixThreadRwlock_unlock(&handle->dbLock);
+        return -1;
+    }
+
+    // In init state
+    if (!entry->readState) {
+        entry->bufferReadSize = 0;
+        if (!handle->bypassHeader) {
+            // First start looking for header
+            entry->readState = READ_STATE_HEADER;
+            entry->expectedReadSize = sizeof(pubsub_tcp_msg_header_t);
+        } else {
+            // When no header use Max buffer size
+            entry->readState = READ_STATE_READY;
+            entry->expectedReadSize = entry->bufferSize;
         }
     }
-    int nbytes = recv(fd, item->buffer, item->bufferSize, 0);
+    // Read the message
+    int nbytes = recv(fd, &entry->buffer[entry->bufferReadSize], entry->expectedReadSize, 0);
     if (nbytes < 0) {
-        L_ERROR("[TCP Socket] read error \n");
-        celixThreadRwlock_unlock(&handle->dbLock);
-        return nbytes;
+        // Handle Socket error, when nbytes == 0 => Connection is lost
+        if (nbytes < 0) {
+            L_ERROR("[TCP Socket] read error %s\n", strerror(errno));
+            errno = 0;
+        }
     }
-    if (!handle->bypassHeader) {
-        nbytes -= sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int);
+    if ((!handle->bypassHeader)&&(nbytes>0)) {
+        // Update buffer administration
+        entry->bufferReadSize += nbytes;
+        entry->expectedReadSize -= nbytes;
+        // When expected data is read then update state
+        if (entry->expectedReadSize <= 0) {
+            pubsub_tcp_msg_header_t *pHeader = (pubsub_tcp_msg_header_t *) entry->buffer;
+            if (entry->readState == READ_STATE_FIND_HEADER) {
+                // When header marker is not found, start finding header
+                if (pHeader->marker_start == MARKER_START_PATTERN) {
+                    // header marker is found, then read the remaining data of the header and update to HEADER State
+                    entry->expectedReadSize = sizeof(pubsub_tcp_msg_header_t) - sizeof(unsigned int);
+                    entry->readState = READ_STATE_HEADER;
+                } else {
+                    // keep looking for the header marker
+                    entry->bufferReadSize = 0;
+                    entry->expectedReadSize = sizeof(unsigned int);
+                }
+                // Check if the header contains the correct markers
+            } else if ((pHeader->marker_start != MARKER_START_PATTERN) || (pHeader->marker_end != MARKER_END_PATTERN)) {
+                // When markers are not correct, find a new marker and update state to FIND Header
+                L_ERROR(
+                    "[TCP Socket] Read Header: Marker (%d)  start: 0x%08X != 0x%08X stop: 0x%08X != 0x%08X errno: %s",
+                    handle->readSeqNr, pHeader->marker_start, MARKER_START_PATTERN, pHeader->marker_end,
+                    MARKER_END_PATTERN, strerror(errno));
+                entry->bufferReadSize = 0;
+                entry->expectedReadSize = sizeof(unsigned int);
+                entry->readState = READ_STATE_FIND_HEADER;
+            } else if (entry->readState == READ_STATE_HEADER) {
+                // Header is found, read the data from the socket, update state to READ_STATE_DATA
+                int buffer_size = pHeader->bufferSize + entry->bufferReadSize;
+                // When buffer is not big enough, reallocate buffer
+                if (buffer_size > entry->bufferSize) {
+                    entry->buffer = realloc(entry->buffer, buffer_size);
+                    entry->bufferSize = buffer_size;
+                    //L_WARN("[TCP Socket: %d, url: %s,  realloc read buffer: (%d, %d) \n", entry->fd, entry->url, entry->bufferSize, buffer_size);
+                }
+                // Set data read size
+                entry->expectedReadSize = pHeader->bufferSize;
+                entry->readState++;
+                // The data is read, update administation and set state to READ_STATE_READY
+            } else if (entry->readState == READ_STATE_DATA) {
+                handle->readSeqNr = pHeader->seqNr;
+                //fprintf(stdout, "ReadSeqNr: Count: %d\n", handle->readSeqNr);
+                nbytes -= sizeof(pubsub_tcp_msg_header_t);
+                if (nbytes == 0) {
+                    errno = 0;
+                }
+                // if buffer does not contain header, reset buffer
+                if (nbytes < 0) {
+                    L_ERROR("[TCP Socket] incomplete message\n");
+                    entry->readState = READ_STATE_INIT;
+                } else {
+                    entry->readState++;
+                }
+            }
+        }
     }
-
-    *index = handle->bufferIdx;
+    *index = 0;
+    // if read state is not ready, don't process buffer
     *size = nbytes;
-    handle->bufferIdx++;
-    handle->bufferIdx = handle->bufferIdx % listSize;
     celixThreadRwlock_unlock(&handle->dbLock);
     return nbytes;
 }
@@ -547,25 +665,31 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne
 //
 // Read out the message which is indicated available by the largeUdp_dataAvailable function
 //
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, unsigned int index, pubsub_tcp_msg_header_t **header,
-                           void **buffer, unsigned int size) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__ ((__unused__)),
+                           pubsub_tcp_msg_header_t **header, void **buffer, unsigned int size) {
     int result = 0;
     celixThreadRwlock_readLock(&handle->dbLock);
-    pubsub_tcpBufferPartList_t *item = arrayList_get(handle->bufferLists, index);
-    if (item) {
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->fd_map, (void *) (intptr_t)fd);
+    if (entry == NULL) result = -1;
+    if (entry) {
+        result = (!entry->connected) ? -1 : result;
+        result = (entry->readState != READ_STATE_READY) ? -1 : result;
+    }
+    if (!result) {
         if (handle->bypassHeader) {
-            *header = &item->default_header;
-            *buffer = item->buffer;
-            item->default_header.type = (unsigned int) item->buffer[handle->msgIdOffset];
-            item->default_header.seqNr = handle->readSeqNr++;
-            item->default_header.sendTimeNanoseconds = 0;
-            item->default_header.sendTimeNanoseconds = 0;
+            *header = &entry->header;
+            *buffer = entry->buffer;
+            entry->header.type = (unsigned int) entry->buffer[handle->msgIdOffset];
+            entry->header.seqNr = handle->readSeqNr++;
+            entry->header.sendTimeNanoseconds = 0;
+            entry->header.sendTimeNanoseconds = 0;
+            entry->readState = READ_STATE_INIT;
         } else {
-            *header = (pubsub_tcp_msg_header_t *) item->buffer;
-            *buffer = &item->buffer[sizeof(pubsub_tcp_msg_header_t) + sizeof(unsigned int)];
+            *header = (pubsub_tcp_msg_header_t *) entry->buffer;
+            *buffer = entry->buffer + sizeof(pubsub_tcp_msg_header_t);
+            entry->readState = READ_STATE_INIT;
         }
-    } else {
-        result = -1;
+        entry->readState = READ_STATE_INIT;
     }
     celixThreadRwlock_unlock(&handle->dbLock);
     return result;
@@ -602,7 +726,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
     celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int written = 0;
-
+    header->marker_start = MARKER_START_PATTERN;
+    header->marker_end   = MARKER_END_PATTERN;
+    header->bufferSize   = size;
     hash_map_iterator_t iter = hashMapIterator_construct(handle->fd_map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
@@ -619,151 +745,178 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
         msg.msg_controllen = 0;
         if (!handle->bypassHeader) {
             msg.msg_iov[0].iov_base = header;
-            msg.msg_iov[0].iov_len = sizeof(*header);
-            msg.msg_iov[1].iov_base = &size;
-            msg.msg_iov[1].iov_len = sizeof(size);
-            msg.msg_iov[2].iov_base = buffer;
-            msg.msg_iov[2].iov_len = size;
-            msg.msg_iovlen = 3;
+            msg.msg_iov[0].iov_len = sizeof(pubsub_tcp_msg_header_t);
+            msg.msg_iov[1].iov_base = buffer;
+            msg.msg_iov[1].iov_len = size;
+            msg.msg_iovlen = 2;
         } else {
             msg.msg_iov[0].iov_base = buffer;
             msg.msg_iov[0].iov_len = size;
             msg.msg_iovlen = 1;
         }
 
-        int nbytes = 0; //
+        int nbytes = 0;
         if (entry->fd >= 0) nbytes = sendmsg(entry->fd, &msg, MSG_NOSIGNAL);
-
         //  Several errors are OK. When speculative write is being done we may not
-        //  be able to write a single byte from the socket. Also, SIGSTOP issued
-        //  by a debugging tool can result in EINTR error.
-        if (nbytes == -1
-            && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
-            result = 0;
-            break;
-        }
+        //  be able to write a single byte to the socket buffer. (socket buffer full)
+        //  In this case when socket is not blocking, exit write function.
+        //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
         if (nbytes == -1) {
+            result = ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) ? 0 : -1;
             L_ERROR("[TCP Socket] Cannot send msg %s\n", strerror(errno));
-            result = -1;
-            break;
+            errno = 0;
+            continue;
+        }
+        int msgSize = 0;
+        for (int i = 0; i < msg.msg_iovlen; i++) {
+            msgSize+=msg.msg_iov[i].iov_len;
+        }
+        if (nbytes != msgSize) {
+            L_ERROR("[TCP Socket] MsgSize not correct: %d != nBytess\n", msgSize, nbytes);
+            continue;
         }
         written += nbytes;
-        handle->writeSeqNr++;
     }
     celixThreadRwlock_unlock(&handle->dbLock);
     return (result == 0 ? written : result);
 }
 
 const char *pubsub_tcpHandler_url(pubsub_tcpHandler_t *handle) {
-    return handle->url;
+    return handle->own.url;
 }
 
 int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
     int rc = 0;
     if (handle->efd >= 0) {
         struct epoll_event events[MAX_EPOLL_EVENTS];
-        int nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout);
+        int nof_events = 0;
+        nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout);
         if (nof_events < 0) {
-            L_ERROR("[TCP Socket] Cannot create epoll wait\n");
-            return nof_events;
+          L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
+          errno = 0;
         }
-        int i = 0;
-        for (i = 0; i < nof_events; i++) {
-            if ((handle->fd >= 0) && (events[i].data.fd == handle->fd)) {
+        for (int i = 0; i < nof_events; i++) {
+            if ((handle->own.fd >= 0) && (events[i].data.fd == handle->own.fd)) {
                 celixThreadRwlock_writeLock(&handle->dbLock);
                 // new connection available
                 struct sockaddr_in their_addr;
                 socklen_t len = sizeof(struct sockaddr_in);
-                rc = accept(handle->fd, &their_addr, &len);
+                int fd = accept(handle->own.fd, &their_addr, &len);
+                rc = fd;
                 if (rc == -1) {
-                    if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
-                        // already closed
-                    } else
-                        L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
-                } else {
+                  L_ERROR("[TCP Socket] accept failed: %s\n", strerror(errno));
+                  errno = 0;
+                }
+                // Make file descriptor NonBlocking
+                if ((!handle->useBlocking) && (rc >= 0)) {
+                    rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
+                    if (rc < 0) pubsub_tcpHandler_freeEntry(&handle->own);
+                }
+                if (rc >= 0){
                     // handle new connection:
                     // add it to reactor, etc
                     struct epoll_event event;
                     bzero(&event, sizeof(event)); // zero the struct
-
                     char *address = inet_ntoa(their_addr.sin_addr);
                     unsigned int port = ntohs(their_addr.sin_port);
                     char *url = NULL;
                     asprintf(&url, "tcp://%s:%u", address, port);
                     psa_tcp_connection_entry_t *entry = calloc(1, sizeof(*entry));
+                    pubsub_tcpHandler_setupEntry(entry, fd, url, MAX_DEFAULT_BUFFER_SIZE);
                     entry->addr = their_addr;
-                    entry->len = len;
-                    entry->url = url;
-                    entry->fd = rc;
-                    event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET | EPOLLOUT;
+                    entry->len  = len;
+                    entry->connected = false;
+                    event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
                     event.data.fd = entry->fd;
+                    // Register Read to epoll
                     rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
                     if (rc < 0) {
-                        close(entry->fd);
-                        free(entry->url);
+                        pubsub_tcpHandler_freeEntry(entry);
                         free(entry);
                         L_ERROR("[TCP Socket] Cannot create epoll\n");
                     } else {
-                        if (handle->connectMessageCallback)
-                            handle->connectMessageCallback(handle->connectPayload, entry->url, true);
                         hashMap_put(handle->fd_map, (void *) (intptr_t) entry->fd, entry);
                         hashMap_put(handle->url_map, entry->url, entry);
-                        fprintf(stdout, "[TCP Socket] New connection to url: %s: \n", url);
+                        L_INFO("[TCP Socket] New connection to url: %s: \n", url);
                     }
+                    free(url);
                 }
                 celixThreadRwlock_unlock(&handle->dbLock);
             } else if (events[i].events & EPOLLIN) {
-                int err = 0;
-                socklen_t len = sizeof(int);
-                rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
-                if (rc != 0) {
-                    L_ERROR("[TCP Socket]: ERROR read from socket \n");
-                    continue;
-                }
-                unsigned int index = 0;
-                unsigned int size = 0;
-                rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index, &size);
-                if (rc == 0) {
-                    pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
-                    continue;
-                } else if (rc < 0) {
-                    continue;
-                }
-                // Handle data
-                pubsub_tcp_msg_header_t *msgHeader = NULL;
-                void *buffer = NULL;
-                rc = pubsub_tcpHandler_read(handle, index, &msgHeader, &buffer, size);
-                if (rc != 0) {
-                    L_ERROR("[TCP Socket]: ERROR read with index %d\n", index);
-                    continue;
-                }
-                celixThreadRwlock_readLock(&handle->dbLock);
-                if (handle->processMessageCallback) {
+                int count = 0;
+                bool isReading = true;
+                while(isReading) {
+                  unsigned int index = 0;
+                  unsigned int size = 0;
+                  isReading = (handle->useBlocking) ? false : isReading;
+                  count++;
+                  rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index, &size);
+                  if (rc <= 0) {
+                        // close connection.
+                        if (rc == 0) {
+                            pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
+                        }
+                        isReading = false;
+                        break;
+                  }
+                  // Handle data
+                  void *buffer = NULL;
+                  pubsub_tcp_msg_header_t *msgHeader = NULL;
+                  rc = pubsub_tcpHandler_read(handle, events[i].data.fd, index, &msgHeader, &buffer, size);
+                    if (rc != 0) {
+                        isReading = false;
+                        break;
+                  }
+                  celixThreadRwlock_readLock(&handle->dbLock);
+                  if ((handle->processMessageCallback)&&(buffer)) {
                     struct timespec receiveTime;
                     clock_gettime(CLOCK_REALTIME, &receiveTime);
-                    handle->processMessageCallback(handle->processMessagePayload, msgHeader, buffer, size,
-                                                   &receiveTime);
+                    handle->processMessageCallback(handle->processMessagePayload, msgHeader, buffer, size, &receiveTime);
+                    isReading = false;
+                  }
+                  celixThreadRwlock_unlock(&handle->dbLock);
                 }
-                celixThreadRwlock_unlock(&handle->dbLock);
             } else if (events[i].events & EPOLLOUT) {
                 int err = 0;
                 socklen_t len = sizeof(int);
                 rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
                 if (rc != 0) {
-                    L_ERROR("[TCP Socket]: ERROR read from socket \n");
+                    L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
+                    errno = 0;
                     continue;
                 }
+                celixThreadRwlock_readLock(&handle->dbLock);
+                psa_tcp_connection_entry_t *entry = hashMap_get(handle->fd_map, (void *) (intptr_t) events[i].data.fd);
+                if (entry)
+                  if ((!entry->connected)) {
+                      // tell sender that an receiver is connected
+                      if (handle->connectMessageCallback) handle->connectMessageCallback(handle->connectPayload, entry->url, false);
+                      entry->connected = true;
+                      struct epoll_event event;
+                      bzero(&event, sizeof(event)); // zero the struct
+                      event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+                      event.data.fd = events[i].data.fd;
+                      // Register Modify epoll
+                      rc = epoll_ctl(handle->efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
+                      //pubsub_tcpHandler_makeBlocking(handle, events[i].data.fd);
+                      if (rc < 0) {
+                          L_ERROR("[TCP Socket] Cannot create epoll\n");
+                      }
+                  }
+                celixThreadRwlock_unlock(&handle->dbLock);
             } else if (events[i].events & EPOLLRDHUP) {
                 int err = 0;
                 socklen_t len = sizeof(int);
                 rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len);
                 if (rc != 0) {
-                    L_ERROR("[TCP Socket]: ERROR read from socket \n");
+                    L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n",strerror(errno));
+                    errno = 0;
                     continue;
                 }
                 pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
             } else if (events[i].events & EPOLLERR) {
-                L_ERROR("[TCP Socket]: ERROR read from socket \n");
+                L_ERROR("[TCP Socket]:EPOLLERR  ERROR read from socket %s\n",strerror(errno));
+                errno = 0;
                 continue;
             }
         }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index c618a6f..c04e05b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -57,9 +57,10 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char* url);
 
 int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, unsigned int maxNofBuffers, unsigned int bufferSize);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
+void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned int msgIdOffset, unsigned int msgIdSize);
 
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size);
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
+  int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, unsigned int *size);
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_tcp_msg_header_t** header, void ** buffer, unsigned int size);
 int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t* header, void* buffer, unsigned int size, int flags);
 int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void* payload, pubsub_tcpHandler_processMessage_callback_t processMessageCallback);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
index e47e76d..38bff8f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_msg_header.h
@@ -20,14 +20,21 @@
 #ifndef PUBSUB_PSA_TCP_MSG_HEADER_H_
 #define PUBSUB_PSA_TCP_MSG_HEADER_H_
 
+#define MARKER_START_PATTERN       (0x56781234)
+#define MARKER_END_PATTERN         (0x67812345)
+
 typedef struct pubsub_tcp_msg_header {
-    uint32_t type; //msg type id (hash of fqn)
-    uint32_t seqNr;
-    uint8_t major;
-    uint8_t minor;
-    unsigned char originUUID[16];
-    uint64_t sendtimeSeconds; //seconds since epoch
-    uint64_t sendTimeNanoseconds; //ns since epoch
+  uint32_t marker_start;
+  uint32_t type; //msg type id (hash of fqn)
+  uint32_t seqNr;
+  uint8_t  major;
+  uint8_t  minor;
+  uint16_t padding;
+  unsigned char originUUID[16];
+  uint64_t sendtimeSeconds; //seconds since epoch
+  uint64_t sendTimeNanoseconds; //ns since epoch
+  uint32_t bufferSize; //Size of the buffer
+  uint32_t marker_end;
 } pubsub_tcp_msg_header_t;
 
-#endif
\ No newline at end of file
+#endif
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index d65c6f7..c2f12e6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -193,6 +193,13 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler, psa_tcp_disConnectHandler);
     }
 
+    if (topicProperties != NULL) {
+        bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
+        long msgIdOffset  = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
+        long msgIdSize    = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE,   PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+        pubsub_tcpHandler_setBypassHeader(receiver->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
+    }
+
     psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
                                                                      PSA_TCP_DEFAULT_METRICS_ENABLED);
@@ -710,10 +717,10 @@ static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock)
     pubsub_tcp_topic_receiver_t *receiver = handle;
     L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url);
     if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
+    psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
     if (entry != NULL) {
-        free(entry->url);
-        free(entry);
+      entry->connected = false;
+      receiver->requestedConnections.allConnected = false;
     }
     if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 1f4b5d1..bdce2ae 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -138,8 +138,13 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+    sender->metricsEnabled   = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
+    if (topicProperties != NULL) {
+        bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties, PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
+        long msgIdOffset  = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET, PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
+        long msgIdSize    = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE,   PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
+        pubsub_tcpHandler_setBypassHeader(sender->socketHandler, bypassHeader, (unsigned int)msgIdOffset, (unsigned int)msgIdSize);
+    }
     /* Check if it's a static endpoint */
     bool isEndPointTypeClient = false;
     bool isEndPointTypeServer = false;
@@ -188,23 +193,16 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
                 /* Randomized part due to same bundle publishing on different topics */
                 unsigned int port = rand_range(basePort, maxPort);
                 char *url = NULL;
-                char *bindUrl = NULL;
-                if(bindIP == NULL) {
-                    asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
-                } else {
-                    asprintf(&bindUrl, "tcp://%s:%u", bindIP, port);
-                }
-
-                asprintf(&url, "tcp://%s:%u", bindIP, port);
-                int rv = pubsub_tcpHandler_listen(sender->socketHandler, bindUrl);
+                if (bindIP == NULL) asprintf(&url, "tcp://0.0.0.0:%u", port);
+                else asprintf(&url, "tcp://%s:%u", bindIP, port);
+                int rv = pubsub_tcpHandler_listen(sender->socketHandler, url);
                 if (rv == -1) {
-                    L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+                    L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", url, strerror(errno));
                     free(url);
                 } else {
                     sender->url = url;
                 }
                 retry++;
-                free(bindUrl);
             }
         }
     }


Mime
View raw message