celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 01/01: refactor read and write function
Date Fri, 05 Jun 2020 13:03:07 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/make_tcp_admin_msg_sending_robust_when_tcp_send_timeout_expires
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 9cdf93bf575aa14323ee4ec45fae9b1d02c18492
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Fri Jun 5 15:02:05 2020 +0200

    refactor read and write function
---
 .../src/pubsub_psa_tcp_constants.h                 |   2 +
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 368 +++++++++++----------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   3 +-
 .../src/pubsub_tcp_topic_receiver.c                |   9 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |   5 +-
 5 files changed, 208 insertions(+), 179 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 3e7a7b3..6026212 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -61,9 +61,11 @@
 //Time-out settings are only for BLOCKING connections
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY       "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT"
 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT   5.0
+#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT   0.0
 
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY      "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT"
 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT  5.0
+#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT  0.0
 
 #define PUBSUB_TCP_PSA_IP_KEY                   "PSA_IP"
 #define PUBSUB_TCP_ADMIN_TYPE                   "tcp"
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 93f0358..7b13b6f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -76,25 +76,28 @@ typedef struct psa_tcp_connection_entry {
     struct sockaddr_in addr;
     socklen_t len;
     bool connected;
+    bool headerError;
     pubsub_protocol_message_t header;
     unsigned int syncSize;
     unsigned int headerSize;
     unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer ->
included in payload
     void *headerBuffer;
+    void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
+    unsigned int bufferReadSize;
     unsigned int metaBufferSize;
     void *metaBuffer;
     struct msghdr msg;
     size_t msg_iovlen;        /* Number of elements in the vector.  */
     unsigned int retryCount;
+    unsigned int seqNr;
 } psa_tcp_connection_entry_t;
 
 //
 // Handle administration
 //
 struct pubsub_tcpHandler {
-    unsigned int readSeqNr;
     celix_thread_rwlock_t dbLock;
     unsigned int timeout;
     hash_map_t *connection_url_map;
@@ -137,9 +140,9 @@ static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t
*entry
 
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int
fd, unsigned int index);
 
-static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
 
-static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd);
+static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry);
 
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
 
@@ -337,6 +340,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char
*url, ch
             entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
             entry->msg_iovlen++;
         }
+        entry->footerBuffer = calloc(sizeof(char), entry->headerSize);
         entry->buffer = calloc(sizeof(char), entry->bufferSize);
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
         entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
@@ -378,6 +382,12 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
             entry->headerBuffer = NULL;
             entry->headerBufferSize = 0;
         }
+
+        if (entry->footerBuffer) {
+            free(entry->footerBuffer);
+            entry->footerBuffer = NULL;
+        }
+
         if (entry->metaBuffer) {
             free(entry->metaBuffer);
             entry->metaBuffer = NULL;
@@ -438,7 +448,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url)
{
 #else
             struct epoll_event event;
             bzero(&event,  sizeof(struct epoll_event)); // zero the struct
-            event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+            event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
             event.data.fd = entry->fd;
             rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
 #endif
@@ -520,7 +530,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
                                       psa_tcp_connection_entry_t *entry) {
     int rc = 0;
     if (handle != NULL && entry != NULL) {
-        fprintf(stdout, "[TCP Socket] Close interface url: %s: \n", entry->url);
+        L_INFO("[TCP Socket] Close interface url: %s: \n", entry->url);
         hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd);
         if ((handle->efd >= 0)) {
 #if defined(__APPLE__)
@@ -546,8 +556,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle,
 //
 // Make accept file descriptor non blocking
 //
-static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
-                                                    int fd) {
+static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd)
{
     int rc = 0;
     int flags = fcntl(fd, F_GETFL, 0);
     if (flags == -1)
@@ -714,13 +723,9 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle,
long prio,
                 sch.sched_priority = prio;
                 pthread_setschedparam(handle->thread.thread, policy, &sch);
             } else {
-                printf("Skipping configuration of thread prio to %i and thread "
+                L_INFO("Skipping configuration of thread prio to %i and thread "
                        "scheduling to %s. No permission\n",
                        (int) prio, sched);
-                celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_INFO,
-                              "Skipping configuration of thread prio to %i and thread "
-                              "scheduling to %s. No permission\n",
-                              (int) prio, sched);
             }
             celixThreadRwlock_unlock(&handle->dbLock);
         }
@@ -759,14 +764,51 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle,
double tim
     }
 }
 
+static inline
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
+    int expectedReadSize = size;
+    int nbytes = size;
+    int msgSize = 0;
+    char* buffer = (char*)_buffer;
+    while (nbytes > 0 && expectedReadSize > 0) {
+        // Read the message header
+        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
+        // Update buffer administration
+        offset += nbytes;
+        expectedReadSize -= nbytes;
+        msgSize += nbytes;
+    }
+    if (nbytes <=0)  msgSize = nbytes;
+    return msgSize;
+}
+
+
+static inline
+void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry) {
+
+  if (entry->header.header.payloadSize > 0) {
+    handle->protocol->decodePayload(handle->protocol->handle, entry->buffer,
entry->header.header.payloadSize, &entry->header);
+  }
+  if (entry->header.header.metadataSize > 0) {
+    handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
+                                     entry->header.header.metadataSize, &entry->header);
+  }
+  if (handle->processMessageCallback && entry->header.payload.payload != NULL
&& entry->header.payload.length) {
+    struct timespec receiveTime;
+    clock_gettime(CLOCK_REALTIME, &receiveTime);
+    bool releaseEntryBuffer = false;
+    handle->processMessageCallback(handle->processMessagePayload, &entry->header,
&releaseEntryBuffer, &receiveTime);
+    if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0);
+  }
+}
+
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
 // If the message is completely reassembled true is returned and the index and size have
valid values
 //
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index,
bool *readMsg) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     celixThreadRwlock_writeLock(&handle->dbLock);
-    *index = 0;
-    *readMsg = false;
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *)
(intptr_t) fd);
     if (entry == NULL)
         entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
@@ -784,93 +826,105 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int
fd, unsigne
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize))
{
         handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
-        char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-        if (buffer) {
-            entry->buffer = buffer;
+        if (entry->buffer) free(entry->buffer);
+            entry->buffer = malloc((size_t) handle->bufferSize);
             entry->bufferSize = handle->bufferSize;
         }
-    }
-
     // Read the message
-    entry->msg.msg_iovlen = 0;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize)
? entry->headerBuffer
-                                                                                   : entry->buffer;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
-    entry->msg.msg_iovlen++;
-    int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
+    bool validMsg = false;
+    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize,
MSG_PEEK);
     if (nbytes > 0) {
-        entry->msg.msg_iovlen = 0;
-        if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) {
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-
-        } else if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                                  entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base,
-                                                  entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len,
&entry->header) !=
-            CELIX_SUCCESS) {
-            entry->msg.msg_iov[0].iov_len = entry->syncSize;
-            nbytes = recvmsg(fd, &entry->msg, 0);
-            if (nbytes > 0)
-                entry->retryCount = 0;
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-        }
-        if (entry->header.header.payloadSize > entry->bufferSize) {
-            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-            char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-            if (buffer) {
-                entry->buffer = buffer;
-                entry->bufferSize = handle->bufferSize;
-            }
-        }
-        if (entry->header.header.metadataSize > entry->metaBufferSize) {
-            char *buffer = realloc(entry->metaBuffer, (size_t) entry->header.header.metadataSize);
-            if (buffer) {
-                entry->metaBuffer = buffer;
-                entry->metaBufferSize = entry->header.header.metadataSize;
-                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d,
%d) \n", entry->fd,
-                       entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-            }
-        }
-
-        if (entry->headerBufferSize)
-            entry->msg.msg_iovlen++;
-        if (entry->header.header.payloadSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.payloadSize;
-            entry->msg.msg_iovlen++;
-        }
-        if (entry->header.header.metadataSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->metaBuffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.metadataSize;
-            entry->msg.msg_iovlen++;
-        }
-        nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL);
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
-            entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message header (fd: %d), error: %s. Retry
count %u of %u,",
-                   entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+        // Check header message buffer
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           header_buffer,
+                                           entry->headerSize,
+                                           &entry->header) != CELIX_SUCCESS) {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize,
0);
+            if (!entry->headerError) L_WARN("[TCP Socket] Failed to decode message header
(fd: %d) (url: %s)", entry->fd, entry->url);
+            entry->headerError = true;
+            entry->bufferReadSize = 0;
         } else {
-            L_ERROR(
-                "[TCP Socket] Failed to receive message header (fd: %d) after %u retries!
Closing connection... Error: %s",
-                entry->fd,
-                handle->maxRcvRetryCount,
-                strerror(errno));
-            nbytes = 0; //Return 0 as indicator to close the connection
+            // Read header message from queue
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize,
0);
+            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                entry->headerError = false;
+                // For headerless message, add header to bufferReadSize;
+                if (!entry->headerBufferSize)
+                    entry->bufferReadSize += nbytes;
+                // Alloc message buffers
+                if (entry->header.header.payloadSize > entry->bufferSize) {
+                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+                    if (entry->buffer)
+                        free(entry->buffer);
+                    entry->buffer = malloc((size_t) handle->bufferSize);
+                    entry->bufferSize = handle->bufferSize;
+                }
+                if (entry->header.header.metadataSize > entry->metaBufferSize) {
+                    if (entry->metaBuffer) {
+                        free(entry->metaBuffer);
+                        entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
+                        entry->bufferSize = handle->bufferSize;
+                        L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer:
(%d, %d) \n", entry->fd,
+                               entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
+                    }
+                }
+
+                if (entry->header.header.payloadSize) {
+                    unsigned int offset = entry->header.header.payloadOffset;
+                    unsigned int size = entry->header.header.payloadPartSize;
+                    // For header less messages adjust offset and msg size;
+                    if (!entry->headerBufferSize) {
+                        offset = entry->headerSize;
+                        size -= offset;
+                    }
+                    // Read payload data from queue
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer,
offset, size, 0);
+                    if (nbytes > 0) {
+                        if (nbytes == size) {
+                            entry->bufferReadSize += nbytes;
+                        } else {
+                            entry->bufferReadSize = 0;
+                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer
(fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                        }
+                    }
+                }
+                if (nbytes > 0 && entry->header.header.metadataSize) {
+                    // Read meta data from queue
+                    unsigned int size = entry->header.header.metadataSize;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0,
size,0);
+                    if ((nbytes > 0) && (nbytes != size)) {
+                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd:
%d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                    }
+                }
+                // Check for end of message using, header of next message. Because of streaming
protocol
+                // TODO: Add to protocol service to decode/EncodeFooter with unique sync
word(different then header)
+                if (nbytes > 0) {
+                    pubsub_protocol_message_t header;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,
0, entry->headerSize, MSG_PEEK);
+                    if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                                 entry->footerBuffer,
+                                                 entry->headerSize,
+                                                 &header) == CELIX_SUCCESS) {
+                        // valid header for next buffer, this means that the message is valid
+                        validMsg = true;
+                    } else {
+                        // Did not receive correct header
+                        L_ERROR("[TCP Socket] Failed to decode next message header seq %d
(received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr,
entry->fd, entry->url);
+                        entry->bufferReadSize = 0;
+                    }
+                }
+            }
         }
     }
     if (nbytes > 0) {
         entry->retryCount = 0;
-        unsigned int msgSize = 0;
-        for (int i = 0; i < entry->msg.msg_iovlen; i++) {
-            msgSize += entry->msg.msg_iov[i].iov_len;
-        }
-        if (nbytes == msgSize) {
-            *readMsg = true;
-        } else {
-            L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) nbytes : %d
= msgSize %d", entry->fd,
-                    nbytes, msgSize);
+        // Check if complete message is received
+        if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
validMsg) {
+            entry->bufferReadSize = 0;
+            pubsub_tcpHandler_decodePayload(handle, entry);
         }
     } else {
         if (entry->retryCount < handle->maxRcvRetryCount) {
@@ -887,34 +941,6 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int
fd, unsigne
     return nbytes;
 }
 
-//
-// Read out the message which is indicated available by the largeUdp_dataAvailable function
-//
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__
((__unused__)),
-                           pubsub_protocol_message_t **header) {
-    int result = 0;
-    celixThreadRwlock_readLock(&handle->dbLock);
-    psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *)
(intptr_t) fd);;
-    if (entry == NULL)
-        entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
-    if (entry == NULL)
-        result = -1;
-    if (entry)
-        result = (!entry->connected) ? -1 : result;
-    if (!result) {
-        if (entry->header.header.payloadSize > 0) {
-            handle->protocol->decodePayload(handle->protocol->handle, entry->buffer,
entry->header.header.payloadSize,
-                                            &entry->header);
-        }
-        if (entry->header.header.metadataSize > 0) {
-            handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
-                                             entry->header.header.metadataSize, &entry->header);
-        }
-        *header = &entry->header;
-    }
-    celixThreadRwlock_unlock(&handle->dbLock);
-    return result;
-}
 
 int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
                                         pubsub_tcpHandler_processMessage_callback_t processMessageCallback)
{
@@ -950,6 +976,44 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t
*handle, v
     return result;
 }
 
+static inline
+int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, struct msghdr* msg, unsigned int size, int flag ) {
+  int nbytes = 0;
+  int msgSize = 0;
+  if (entry->fd >= 0 && size && msg->msg_iovlen) {
+    int expectedReadSize = size;
+    unsigned int offset = 0;
+    nbytes = size;
+    while (nbytes > 0 && expectedReadSize > 0) {
+      // Read the message header
+      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
+      // Update admin
+      expectedReadSize -= nbytes;
+      msgSize += nbytes;
+      // Not all written
+      if (expectedReadSize && nbytes > 0) {
+        unsigned int readSize = 0;
+        unsigned int readIndex = 0;
+        unsigned int i = 0;
+        for (i = 0; i < msg->msg_iovlen; i++) {
+          if (nbytes < msg->msg_iov[i].iov_len) {
+            readIndex = i;
+            break;
+          }
+          readSize+= msg->msg_iov[i].iov_len;
+        }
+        msg->msg_iov = &msg->msg_iov[readIndex];
+        msg->msg_iovlen -= readIndex;
+        char* buffer = (char*)msg->msg_iov->iov_base;
+        offset = nbytes - readSize;
+        msg->msg_iov->iov_base = &buffer[offset];
+        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
+      }
+    }
+  }
+  if (nbytes <=0)  msgSize = nbytes;
+  return msgSize;
+}
 //
 // Write large data to TCP. .
 //
@@ -963,6 +1027,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->connected) continue;
             void *payloadData = NULL;
             size_t payloadSize = 0;
             if (msg_iov_len == 1) {
@@ -973,6 +1038,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 }
             }
 
+            message->header.seqNr = entry->seqNr;
             message->header.payloadSize = payloadSize;
             message->header.payloadPartSize = payloadSize;
             message->header.payloadOffset = 0;
@@ -987,15 +1053,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             message->header.metadataSize = metadataSize;
 
             size_t msgSize = 0;
-            long int nbytes = 0;
             struct msghdr msg;
             struct iovec msg_iov[IOV_MAX];
+            memset(&msg, 0x00, sizeof(struct msghdr));
             msg.msg_name = &entry->addr;
             msg.msg_namelen = entry->len;
             msg.msg_flags = flags;
-            msg.msg_control = NULL;
-            msg.msg_controllen = 0;
-            msg.msg_iovlen = 0;
             msg.msg_iov = msg_iov;
 
             // Write generic seralized payload in vector buffer
@@ -1044,10 +1107,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
               L_ERROR("[TCP Socket] No header buffer is generated");
               msg.msg_iovlen = 0;
             }
-            nbytes = 0;
-            if (entry->fd >= 0 && msgSize && msg.msg_iovlen) {
-                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
-            }
+            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
             //  When a specific socket keeps reporting errors can indicate a subscriber
             //  which is not active anymore, the connection will remain until the retry
             //  counter exceeds the maximum retry count.
@@ -1068,7 +1128,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             } else if (msgSize) {
                 entry->retryCount = 0;
                 if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket]  MsgSize not correct: %d != %d\n", msgSize, nbytes);
+                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
entry->seqNr, msgSize, nbytes,  strerror(errno));
                 }
             }
             // Release data
@@ -1082,6 +1142,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             if (metadataData) {
                 free(metadataData);
             }
+            entry->seqNr++;
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
@@ -1168,51 +1229,10 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect
 }
 
 //
-// Handle sockets reads (blocking)
-//
-static inline
-void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd) {
-    unsigned int index = 0;
-    bool readMsg = false;
-    int rc = pubsub_tcpHandler_dataAvailable(handle, fd, &index, &readMsg);
-    if (rc <= 0) {
-        // close connection.
-        if (rc == 0)
-            pubsub_tcpHandler_close(handle, fd);
-        return;
-    }
-    if (readMsg) {
-        // Handle data
-        pubsub_protocol_message_t *header = NULL;
-        rc = pubsub_tcpHandler_read(handle, fd, index, &header);
-        if (rc < 0)
-            return;
-        celixThreadRwlock_readLock(&handle->dbLock);
-        if (handle->processMessageCallback && header != NULL && header->payload.payload
!= NULL &&
-            header->payload.length) {
-            struct timespec receiveTime;
-            clock_gettime(CLOCK_REALTIME, &receiveTime);
-            bool releaseEntryBuffer = false;
-            handle->processMessageCallback(handle->processMessagePayload, header, &releaseEntryBuffer,
&receiveTime);
-            if (releaseEntryBuffer)
-                pubsub_tcpHandler_releaseEntryBuffer(handle, fd, index);
-        }
-        celixThreadRwlock_unlock(&handle->dbLock);
-    }
-}
-
-//
 // Handle sockets connection (sender)
 //
 static inline
 void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) {
-    int err = 0;
-    socklen_t len = sizeof(int);
-    int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
-    if (rc != 0) {
-        L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno));
-        return;
-    }
     celixThreadRwlock_readLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *)
(intptr_t) fd);
     if (entry)
@@ -1255,7 +1275,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
         int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
         pubsub_tcpHandler_connectionHandler(handle, fd);
       } else if (events[i].filter & EVFILT_READ) {
-        pubsub_tcpHandler_readHandler(handle, events[i].ident);
+        int rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
+        if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
       } else if (events[i].flags & EV_EOF) {
         int err = 0;
         socklen_t len = sizeof(int);
@@ -1304,7 +1325,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
                pubsub_tcpHandler_connectionHandler(handle, fd);
             } else if (events[i].events & EPOLLIN) {
-                pubsub_tcpHandler_readHandler(handle, events[i].data.fd);
+                rc = pubsub_tcpHandler_read(handle, events[i].data.fd);
+                if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd);
             } else if (events[i].events & EPOLLRDHUP) {
                 int err = 0;
                 socklen_t len = sizeof(int);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index 260edc1..ed4581c 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -68,8 +68,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 
-int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index,
bool *readMsg);
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_protocol_message_t
**header);
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
                             pubsub_protocol_message_t *message,
                             struct iovec *msg_iovec,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 8cbf8fc..0bd51c5 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -157,6 +157,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
+    bool isEndpoint = false;
     bool isServerEndPoint = false;
 
     /* Check if it's a static endpoint */
@@ -167,6 +168,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS,
NULL);
         const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE,
NULL);
         if (endPointType != NULL) {
+            isEndpoint = true;
             if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
                         strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
                 staticClientEndPointUrls = staticConnectUrls;
@@ -207,8 +209,9 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED,
NULL);
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
-        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,
-                                                         PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
+        double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,

+                                                        (!isEndpoint) ? PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT
:
+                                                                        PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT);
         long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
                                                               PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
         long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
@@ -789,4 +792,4 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major,
uint16_t
     }
 
     return check;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 69c862a..47dc888 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -145,6 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     }
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
                                                                    PSA_TCP_DEFAULT_METRICS_ENABLED);
+    bool isEndpoint = false;
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
     const char *discUrl = NULL;
@@ -155,6 +156,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         /* Check if it's a static endpoint */
         const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE,
NULL);
         if (endPointType != NULL) {
+            isEndpoint = true;
             if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
                         strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
                 staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS,
NULL);
@@ -192,7 +194,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
         double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                      PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT
:
+                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);


Mime
View raw message