celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 10/11: Fix
Date Mon, 12 Oct 2020 19:18:41 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 4cfc6ecaf6a8055bd2c514cfc11f93426aac326b
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Mon Aug 3 16:52:22 2020 +0200

    Fix
---
 .../src/pubsub_psa_tcp_constants.h                 |   2 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 369 ++++++---------------
 .../pubsub/pubsub_utils/include/pubsub_utils_url.h |   8 +-
 bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c |  22 +-
 4 files changed, 122 insertions(+), 279 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 9551c5b..9f03d13 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -31,7 +31,7 @@
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        0
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index aae0406..411e9d2 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -32,17 +32,13 @@
 #include <array_list.h>
 #include <pthread.h>
 #if defined(__APPLE__)
-#include <sys/types.h>
 #include <sys/event.h>
 #include <sys/time.h>
 #else
 #include <sys/epoll.h>
 #endif
 #include <limits.h>
-#include <assert.h>
 #include "ctype.h"
-#include <netdb.h>
-#include <signal.h>
 #include <fcntl.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
@@ -53,12 +49,13 @@
 #define MAX_EVENTS   64
 #define MAX_DEFAULT_BUFFER_SIZE 4u
 
-#define READ_STATE_HEADER       0u
-#define READ_STATE_PAYLOAD      1u
-#define READ_STATE_META         2u
-#define READ_STATE_FOOTER       3u
-#define READ_STATE_READY        4u
-#define READ_STATE_SYNC         5u
+#define READ_STATE_INIT         0u
+#define READ_STATE_HEADER       1u
+#define READ_STATE_PAYLOAD      2u
+#define READ_STATE_META         3u
+#define READ_STATE_FOOTER       4u
+#define READ_STATE_READY        5u
+#define READ_STATE_SYNC         6u
 
 #if defined(__APPLE__)
 #define MSG_NOSIGNAL (0)
@@ -95,7 +92,6 @@ typedef struct psa_tcp_connection_entry {
     void *footerBuffer;
     unsigned int bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
     unsigned int bufferReadReadOffset;
     unsigned int expectedBufferReadSize;
     unsigned int msgSizeReadSize;
@@ -142,8 +138,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t
*handle,
 static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t
*handle, int fd, char *url, char *external_url, struct sockaddr_in *addr);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int
fd, unsigned int index);
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-static inline int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* buffer, int flag );
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* buffer, int flag );
 static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry);
 static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle,
psa_tcp_connection_entry_t *entry, int nextState);
 static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry);
@@ -264,7 +259,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) {
                 L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout:
%s", strerror(errno));
             }
         }
-        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if (addr) {
             rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc != 0) {
@@ -342,6 +337,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char
*url, ch
         }
         if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
         if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
+        pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
     }
     return entry;
 }
@@ -418,11 +414,11 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url)
{
         socklen_t len = sizeof(sin);
         getsockname(fd, (struct sockaddr *) &sin, &len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
-        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if ((rc >= 0) && addr) {
             rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc < 0 && errno != EINPROGRESS) {
-                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname,
url_info->portnr, interface_url,
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname,
url_info->port_nr, interface_url,
                         strerror(errno));
                 close(fd);
             } else {
@@ -449,13 +445,12 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url)
{
                 L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno));
                 entry = NULL;
             }
-            //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
-            //if (rc < 0) {
-            //    pubsub_tcpHandler_freeEntry(entry);
-            //    L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
-            //    entry = NULL;
-           // }
-
+            rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            if (rc < 0) {
+                pubsub_tcpHandler_freeEntry(entry);
+                L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno));
+                entry = NULL;
+            }
         }
         if ((rc >= 0) && (entry)) {
             celixThreadRwlock_writeLock(&handle->dbLock);
@@ -657,6 +652,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned
int si
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
         handle->maxMsgSize = size;
+        handle->maxMsgSize = 4;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -727,7 +723,7 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle,
long prio,
             if (prio > 0 && prio < 100) {
                 struct sched_param sch;
                 bzero(&sch, sizeof(struct sched_param));
-                sch.sched_priority = prio;
+                sch.sched_priority = (int)prio;
                 pthread_setschedparam(handle->thread.thread, policy, &sch);
             } else {
                 L_INFO("Skipping configuration of thread prio to %i and thread "
@@ -772,26 +768,8 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle,
double tim
 }
 
 static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
-    char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
-        // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
-        // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
-    }
-    if (nbytes <=0)  msgSize = nbytes;
-    return msgSize;
-}
-
-static inline
-int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, int flag ) {
-    int nbytes = entry->expectedBufferReadSize;
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, int fd, void* _buffer, int flag ) {
+    int long nbytes = entry->expectedBufferReadSize;
     char* buffer = (char*)_buffer;
     while (nbytes > 0 && entry->expectedBufferReadSize > 0) {
         // Read the message header
@@ -800,9 +778,6 @@ int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connectio
         entry->bufferReadReadOffset += nbytes;
         entry->expectedBufferReadSize-= nbytes;
     }
-    if (nbytes == 0) {
-        L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u
of %u,", entry->fd, strerror(errno));
-    }
     return nbytes;
 }
 
@@ -837,7 +812,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec
         if (entry->header.header.payloadSize) {
             entry->state = READ_STATE_PAYLOAD;
             entry->bufferReadReadOffset = entry->header.header.payloadOffset;
-            entry->expectedBufferReadSize = entry->header.header.payloadSize;
+            entry->expectedBufferReadSize = (entry->header.header.payloadPartSize)
? entry->header.header.payloadPartSize : entry->header.header.payloadSize;
             // For header less messages adjust offset and msg size;
             if (!entry->headerBufferSize) {
                 entry->bufferReadReadOffset += entry->headerSize;
@@ -910,6 +885,7 @@ static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHand
 
 
 
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
 // If the message is completely reassembled true is returned and the index and size have
valid values
@@ -932,182 +908,49 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
 
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize))
{
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
-        if (entry->buffer) free(entry->buffer);
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        if (entry->buffer)
+            free(entry->buffer);
         entry->buffer = malloc((size_t) handle->bufferSize);
         entry->bufferSize = handle->bufferSize;
     }
     // Read the message
-    bool validMsg = false;
-    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
-    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize,
MSG_PEEK);
-    if (nbytes > 0) {
-        // Check header message buffer
-        if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer,
entry->headerSize, &entry->header) != CELIX_SUCCESS) {
-            // Did not receive correct header
-            // skip sync word and try to read next header
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize,
0);
-            if (!entry->headerError) {
-                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)",
entry->fd, entry->url);
-            }
-            entry->headerError = true;
-            entry->bufferReadSize = 0;
-        } else {
-            // Read header message from queue
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize,
0);
-            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                entry->headerError = false;
-                // For headerless message, add header to bufferReadSize;
-                if (!entry->headerBufferSize)
-                    entry->bufferReadSize += nbytes;
-                // Alloc message buffers
-                if (entry->header.header.payloadSize > entry->bufferSize) {
-                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-                    if (entry->buffer)
-                        free(entry->buffer);
-                    entry->buffer = malloc((size_t) handle->bufferSize);
-                    entry->bufferSize = handle->bufferSize;
-                }
-                if (entry->header.header.metadataSize > entry->metaBufferSize) {
-                    if (entry->metaBuffer) {
-                        free(entry->metaBuffer);
-                    }
-                    entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
-                    entry->metaBufferSize = entry->header.header.metadataSize;
-                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer:
(%d, %d) \n", entry->fd,
-                           entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-                }
-
-                if (entry->header.header.payloadSize) {
-                    unsigned int offset = entry->header.header.payloadOffset;
-                    unsigned int size = entry->header.header.payloadPartSize;
-                    // For header less messages adjust offset and msg size;
-                    if (!entry->headerBufferSize) {
-                        offset = entry->headerSize;
-                        size -= offset;
-                    }
-                    // Read payload data from queue
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer,
offset, size, 0);
-                    if (nbytes > 0) {
-                        if (nbytes == size) {
-                            entry->bufferReadSize += nbytes;
-                        } else {
-                            entry->bufferReadSize = 0;
-                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer
(fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                        }
-                    }
-                }
-                if (nbytes > 0 && entry->header.header.metadataSize) {
-                    // Read meta data from queue
-                    unsigned int size = entry->header.header.metadataSize;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0,
size,0);
-                    if ((nbytes > 0) && (nbytes != size)) {
-                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd:
%d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                    }
-                }
-                // Check for end of message using, footer of message. Because of streaming
protocol
-                if (nbytes > 0) {
-                    if (entry->footerSize > 0) {
-                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0,
entry->footerSize,0);
-                        if (handle->protocol->decodeFooter(handle->protocol->handle,
entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
-                            // valid footer, this means that the message is valid
-                            validMsg = true;
-                        } else {
-                            // Did not receive correct header
-                            L_ERROR("[TCP Socket] Failed to decode message footer seq %d
(received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr,
entry->fd, entry->url);
-                            entry->bufferReadSize = 0;
-                        }
-                    } else {
-                        // No Footer, then complete message is received
-                        validMsg = true;
-                    }
-                }
-            }
-        }
-    }
-    if (nbytes > 0) {
-        entry->retryCount = 0;
-        // Check if complete message is received
-        if ((entry->bufferReadSize >= entry->header.header.payloadSize) &&
validMsg) {
-            entry->bufferReadSize = 0;
-            pubsub_tcpHandler_decodePayload(handle, entry);
-        }
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
-            entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count
%u of %u,", entry->fd,
-                   strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
-        } else {
-            L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing
connection... Error: %s",
-                    entry->fd, handle->maxRcvRetryCount, strerror(errno));
-            nbytes = 0; //Return 0 as indicator to close the connection
-        }
-    }
-    celixThreadRwlock_unlock(&handle->dbLock);
-    return nbytes;
-}
-
-
-//
-// Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
-// If the message is completely reassembled true is returned and the index and size have
valid values
-//
-int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
-    celixThreadRwlock_writeLock(&handle->dbLock);
-    psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *)
(intptr_t) fd);
-    if (entry == NULL)
-        entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
-    // Find FD entry
-    if (entry == NULL) {
-        celixThreadRwlock_unlock(&handle->dbLock);
-        return -1;
-    }
-    // If it's not connected return from function
-    if (!entry->connected) {
-        celixThreadRwlock_unlock(&handle->dbLock);
-        return -1;
-    }
-
-    // Message buffer is to small, reallocate to make it bigger
-    if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize))
{
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
-        if (entry->buffer) free(entry->buffer);
-            entry->buffer = malloc((size_t) handle->bufferSize);
-            entry->bufferSize = handle->bufferSize;
-        }
-    // Read the message
-    long int nbytes = 0;
-    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
+    long int nbytes = UINT32_MAX;
+    char *header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
     if (entry->state == READ_STATE_SYNC) {
-        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0);
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
         if (nbytes > 0) {
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
     }
     if (entry->state == READ_STATE_HEADER) {
-      nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, MSG_PEEK);
-      if (nbytes >= entry->headerSize) { // Check header message buffer
-          if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer,
entry->headerSize, &entry->header) != CELIX_SUCCESS) {
-              // Did not receive correct header
-              // skip sync word and try to read next header
-              if (!entry->headerError) {
-                  L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)",
entry->fd, entry->url);
-              }
-              entry->headerError = true;
-              pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC);
-          } else {
-              // Read header message from queue
-              pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
-              nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0);
-              if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                 entry->headerError = false;
-                 entry->msgSizeReadSize = 0;
-                 // For headerless message, add header to bufferReadSize;
-                 if (!entry->headerBufferSize) entry->msgSizeReadSize += nbytes;
-                 pubsub_tcpHandler_setReadStateMachine(handle, entry);
-              }
-          }
-      }
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
+        if (nbytes >= entry->headerSize) { // Check header message buffer
+            if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                               header_buffer,
+                                               entry->headerSize,
+                                               &entry->header) != CELIX_SUCCESS) {
+                // Did not receive correct header
+                // skip sync word and try to read next header
+                if (!entry->headerError) {
+                    L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)",
entry->fd, entry->url);
+                }
+                entry->headerError = true;
+                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC);
+            } else {
+                // Read header message from queue
+                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+                nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
+                if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                    entry->headerError = false;
+                    entry->msgSizeReadSize = 0;
+                    // For headerless message, add header to bufferReadSize;
+                    if (!entry->headerBufferSize)
+                        entry->msgSizeReadSize += nbytes;
+                    pubsub_tcpHandler_setReadStateMachine(handle, entry);
+                }
+            }
+        }
     }
 
     if (entry->state == READ_STATE_PAYLOAD) {
@@ -1121,7 +964,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
         }
 
         // Read payload data from queue
-        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->buffer, 0);
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
         if (nbytes > 0) {
             if (nbytes >= entry->header.header.payloadPartSize) {
                 entry->msgSizeReadSize += nbytes;
@@ -1142,7 +985,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
         }
 
         // Read meta data from (queue
-        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->metaBuffer,0);
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0);
         if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize))
{
             entry->msgSizeReadSize += nbytes;
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
@@ -1150,7 +993,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
     }
     if (entry->state == READ_STATE_FOOTER) {
         // Check for end of message using, footer of message. Because of streaming protocol
-        nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->footerBuffer,
0);
+        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,
0);
         if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer,
entry->footerSize, &entry->header) == CELIX_SUCCESS) {
             // valid footer, this means that the message is valid
             pubsub_tcpHandler_setReadStateMachine(handle, entry);
@@ -1174,8 +1017,9 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
             entry->retryCount = 0;
         } else if (entry->retryCount < handle->maxRcvRetryCount) {
             entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count
%u of %u,", entry->fd,
-                   strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+            L_WARN(
+                "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s,
Retry count %u of %u.",
+                entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
         } else {
             L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing
connection... Error: %s",
                     entry->fd, handle->maxRcvRetryCount, strerror(errno));
@@ -1183,7 +1027,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
-    return nbytes;
+    return (int)nbytes;
 }
 
 
@@ -1421,8 +1265,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                     if (entry->retryCount < handle->maxSendRetryCount) {
                         entry->retryCount++;
                         L_ERROR(
-                            "[TCP Socket] Failed to send message (fd: %d), error: %s. try
again. Retry count %u of %u, ",
-                            entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry
count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount,
errno, strerror(errno));
                     } else {
                         L_ERROR(
                             "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
@@ -1560,49 +1404,48 @@ void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle,
int fd) {
 //
 static inline
 void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
-  int rc = 0;
-  if (handle->efd >= 0) {
-    int nof_events = 0;
-    //  Wait for events.
-    struct kevent events[MAX_EVENTS];
-    struct timespec ts = {handle->timeout / 1000, (handle->timeout  % 1000) * 1000000};
-    nof_events = kevent (handle->efd, NULL, 0, &events[0], MAX_EVENTS, handle->timeout
? &ts : NULL);
-    if (nof_events < 0) {
-      if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
-      } else
-        L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno));
-    }
-    for (int i = 0; i < nof_events; i++) {
-      hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map);
-      psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
-      while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (events[i].ident == entry->fd)
-          pendingConnectionEntry = entry;
-      }
-      if (pendingConnectionEntry) {
-        int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
-        pubsub_tcpHandler_connectionHandler(handle, fd);
-      } else if (events[i].filter & EVFILT_READ) {
-        int rc = pubsub_tcpHandler_read(handle, events[i].ident);
-        if (rc == 0) pubsub_tcpHandler_close(handle, events[i].ident);
-      } else if (events[i].flags & EV_EOF) {
-        int err = 0;
-        socklen_t len = sizeof(int);
-        rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len);
-        if (rc != 0) {
-          L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
-          continue;
+    int rc = 0;
+    if (handle->efd >= 0) {
+        int nof_events = 0;
+        //  Wait for events.
+        struct kevent events[MAX_EVENTS];
+        struct timespec ts = {handle->timeout / 1000, (handle->timeout  % 1000) * 1000000};
+        nof_events = kevent (handle->efd, NULL, 0, &events[0], MAX_EVENTS, handle->timeout
? &ts : NULL);
+        if (nof_events < 0) {
+            if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+            } else
+                L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno));
+        }
+        for (int i = 0; i < nof_events; i++) {
+            hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map);
+            psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
+            while (hashMapIterator_hasNext(&iter)) {
+                psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+                if (events[i].ident == entry->fd)
+                    pendingConnectionEntry = entry;
+            }
+            if (pendingConnectionEntry) {
+                int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry);
+                pubsub_tcpHandler_connectionHandler(handle, fd);
+            } else if (events[i].filter & EVFILT_READ) {
+                rc = pubsub_tcpHandler_read(handle, events[i].ident);
+                if (rc == 0) pubsub_tcpHandler_close(handle, events[i].ident);
+            } else if (events[i].flags & EV_EOF) {
+                int err = 0;
+                socklen_t len = sizeof(int);
+                rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len);
+                if (rc != 0) {
+                    L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno));
+                    continue;
+                }
+                pubsub_tcpHandler_close(handle, events[i].ident);
+            } else if (events[i].flags & EV_ERROR) {
+                L_ERROR("[TCP Socket]:EPOLLERR  ERROR read from socket %s\n", strerror(errno));
+                pubsub_tcpHandler_close(handle, events[i].ident);
+                continue;
+            }
         }
-        pubsub_tcpHandler_close(handle, events[i].ident);
-      } else if (events[i].flags & EV_ERROR) {
-        L_ERROR("[TCP Socket]:EPOLLERR  ERROR read from socket %s\n", strerror(errno));
-        pubsub_tcpHandler_close(handle, events[i].ident);
-        continue;
-      }
     }
-  }
-  return;
 }
 
 #else
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index 87d4263..b10863c 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -28,16 +28,16 @@ typedef struct pubsub_utils_url {
   char *url;
   char *protocol;
   char *hostname;
-  unsigned int portnr;
+  unsigned int port_nr;
   char *uri;
   char *interface;
-  unsigned int interface_portnr;
+  unsigned int interface_port_nr;
   char *interface_url;
 } pubsub_utils_url_t;
 
 struct sockaddr_in *pubsub_utils_url_from_fd(int fd);
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port);
-char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol);
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port);
+char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol);
 char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
 bool pubsub_utils_url_is_multicast(char *hostname);
 char *pubsub_utils_url_get_multicast_ip(char *hostname);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index d8d518c..65a1ff2 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -56,7 +56,7 @@ struct sockaddr_in *pubsub_utils_url_from_fd(int fd) {
     return inp;
 }
 
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) {
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) {
     struct hostent *hp;
     struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in));
     bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
@@ -220,11 +220,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info)
{
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->portnr = portDigits;
+                    url_info->port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -256,11 +256,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info)
{
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->interface_portnr = portDigits;
+                    url_info->interface_port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -289,13 +289,13 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
             free(url_info->interface);
             url_info->interface = ip;
         }
-        struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr);
+        struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
         url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL);
         free(m_sin);
         pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info);
         free(url_info->interface);
         url_info->interface = interface_url_info.hostname;
-        url_info->interface_portnr = interface_url_info.portnr;
+        url_info->interface_port_nr = interface_url_info.port_nr;
     }
 
     if (url_info->hostname) {
@@ -306,11 +306,11 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
             free(url_info->hostname);
             url_info->hostname = ip;
         }
-        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol);
         free(url_info->hostname);
         free(sin);
-        url_info->portnr = 0;
+        url_info->port_nr = 0;
         url_info->hostname = NULL;
         pubsub_utils_url_parse_url(url_info->url, url_info);
     }
@@ -338,7 +338,7 @@ void pubsub_utils_url_free(pubsub_utils_url_t *url_info) {
     url_info->hostname = NULL;
     url_info->protocol = NULL;
     url_info->interface = NULL;
-    url_info->portnr = 0;
-    url_info->interface_portnr = 0;
+    url_info->port_nr = 0;
+    url_info->interface_port_nr = 0;
     free(url_info);
 }


Mime
View raw message