celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3 updated: FIx
Date Mon, 12 Oct 2020 19:19:05 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
by this push:
     new fe4cef0   FIx
fe4cef0 is described below

commit fe4cef0c141de6ec0575f5dec1a5040a6916b91d
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Mon Oct 12 21:16:21 2020 +0200

     FIx
---
 CMakeLists.txt                                     |   2 +-
 .../publisher/private/src/pubsub_publisher.c       |  24 ++-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 194 ++++++++++-----------
 3 files changed, 108 insertions(+), 112 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 788200f..598600b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -43,7 +43,7 @@ set(ENABLE_MORE_WARNINGS OFF)
 
 # Set C specific flags
 set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -fPIC ${CMAKE_C_FLAGS}")
-set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
+#set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
 
 # Set C++ specific flags
 set(CMAKE_CXX_FLAGS "-std=c++11 -fno-rtti ${CMAKE_CXX_FLAGS}")
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 92e0e65..03857dd 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -54,16 +54,15 @@ static void* send_thread(void* arg) {
     //poi_t point = calloc(1,sizeof(*point));
     location_t place = calloc(1, sizeof(*place));
 
-    char *desc = calloc(1, sizeof(char)); //calloc(64, sizeof(char));
-    //snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+    char *desc = calloc(64, sizeof(char));
+    snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
 
-    char *name = calloc(1, sizeof(char));//calloc(64, sizeof(char));
-    //snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
+    char *name = calloc(64, sizeof(char));
+    snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
 
     place->name = name;
     place->description = desc;
-    //place->extra = "extra value";
-    place->extra = calloc(1, 1);
+    place->extra = "extra value";
     printf("TOPIC : %s\n", st_struct->topic);
 
     unsigned int msgId = 0;
@@ -78,21 +77,18 @@ static void* send_thread(void* arg) {
         if (msgId > 0) {
             place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
             place->position.lon = randCoordinate(MIN_LON, MAX_LON);
-            //int nr_char = (int) randCoordinate(5, 100000);
-            int nr_char = 1;//(int) randCoordinate(5, 20);
+            int nr_char = (int) randCoordinate(5, 100000);
             place->data = calloc(nr_char, 1);
             for (int i = 0; i < (nr_char - 1); i++) {
                 place->data[i] = i % 10 + '0';
             }
             place->data[nr_char - 1] = '\0';
             if (publish_svc->send) {
-                celix_properties_t *metadata = NULL;
-                //celix_properties_t *metadata = celix_properties_create();
-                //celix_properties_set(metadata, "Key", "Value");
-
+                celix_properties_t *metadata = celix_properties_create();
+                celix_properties_set(metadata, "Key", "Value");
                 if (publish_svc->send(publish_svc->handle, msgId, place, metadata)
== 0) {
-                   // printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
-                   //        place->position.lat, place->position.lon, place->name,
place->description, nr_char);
+                    printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
+                           place->position.lat, place->position.lon, place->name,
place->description, nr_char);
                 }
             } else {
                 printf("No send for %s\n", st_struct->topic);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 2749794..6827e52 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -640,7 +640,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned
int si
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
         handle->maxMsgSize = size;
-        handle->maxMsgSize = 16;
+        handle->maxMsgSize = 4;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -887,7 +887,7 @@ static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHand
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
 // If the message is completely reassembled true is returned and the index and size have
valid values
 //
-int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
     celixThreadRwlock_writeLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *)
(intptr_t) fd);
     if (entry == NULL)
@@ -914,116 +914,110 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
             entry->bufferSize = handle->bufferSize;
         }
     }
+    struct msghdr msg;
+    struct iovec msg_iov[IOV_MAX];
+    memset(&msg, 0x00, sizeof(struct msghdr));
+    msg.msg_iov = msg_iov;
+
     // Read the message
-    // Read the message
-    entry->msg.msg_iovlen = 0;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize)
? entry->headerBuffer
-                                                                                   : entry->buffer;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
-    entry->msg.msg_iovlen++;
-    int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
-    long int nbytes = UINT32_MAX;
-    char *header_buffer = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer :
entry->buffer;
-    if (entry->state == READ_STATE_SYNC) {
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
-        nbytes = recv(fd, header_buffer, entry->expectedBufferReadSize, flag | MSG_NOSIGNAL);
-        if (nbytes && (entry->expectedBufferReadSize <= 0))  {
-            pubsub_tcpHandler_setReadStateMachine(handle, entry);
-        }
-    }
-    if (entry->state == READ_STATE_HEADER) {
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK);
-        if (nbytes && (entry->expectedBufferReadSize <= 0))  { // Check header
message buffer
-            if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                               header_buffer,
-                                               entry->headerSize,
-                                               &entry->header) != CELIX_SUCCESS) {
-                // Did not receive correct header
-                // skip sync word and try to read next header
-                if (!entry->headerError) {
-                    L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)",
entry->fd, entry->url);
-                }
-                entry->headerError = true;
-                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC);
-            } else {
-                // Read header message from queue
-                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
-                nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0);
-                if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                    entry->headerError = false;
-                    entry->msgSizeReadSize = 0;
-                    // For headerless message, add header to bufferReadSize;
-                    if (!entry->readHeaderBufferSize) entry->msgSizeReadSize += nbytes;
-                    pubsub_tcpHandler_setReadStateMachine(handle, entry);
+    msg.msg_iovlen = 0;
+    msg.msg_iov[msg.msg_iovlen].iov_base = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer
: entry->buffer;
+    msg.msg_iov[msg.msg_iovlen].iov_len  = entry->headerSize;
+    msg.msg_iovlen++;
+    long int msgSize = 0;
+    long int nbytes = recvmsg(fd, &msg, MSG_PEEK | MSG_NOSIGNAL);
+    if (nbytes >= entry->headerSize) {
+        msg.msg_iovlen--;
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           msg.msg_iov[msg.msg_iovlen].iov_base,
+                                           msg.msg_iov[msg.msg_iovlen].iov_len,
+                                           &entry->header) != CELIX_SUCCESS) {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            if (!entry->headerError) {
+                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)",
entry->fd, entry->url);
+            }
+            entry->headerError = true;
+            msg.msg_iov[msg.msg_iovlen].iov_len = entry->syncSize;
+            msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            msg.msg_iovlen++;
+        } else {
+            //printf("PayRR[%d]: Offset %d, %d total: %d uel %s\n", entry->header.header.seqNr,
(int)entry->header.header.payloadOffset, (int)entry->header.header.payloadPartSize,
(int)entry->header.header.payloadSize, entry->url);
+            // Alloc message buffers
+            if (entry->header.header.payloadSize > entry->bufferSize) {
+                handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
+                char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
+                if (buffer) {
+                    entry->buffer = buffer;
+                    entry->bufferSize = handle->bufferSize;
                 }
             }
-        }
-    }
-
-    if (entry->state == READ_STATE_PAYLOAD) {
-        // Alloc message buffers
-        if (entry->header.header.payloadSize > entry->bufferSize) {
-            handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize
+ PADDING_BUFFER_SIZE);
-            if (entry->buffer) {
-                free(entry->buffer);
+            if (entry->header.header.payloadOffset ==0 ){
+                memset(entry->buffer, 0x00, entry->bufferSize);
             }
-            entry->buffer = malloc((size_t) handle->bufferSize);
-            entry->bufferSize = handle->bufferSize;
-        }
-
-        //if (entry->header.header.isLastSegment) entry->expectedBufferReadSize+=4;
-        // Read payload data from queue
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0);
-        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
-            entry->msgSizeReadSize += nbytes;
-            pubsub_tcpHandler_setReadStateMachine(handle, entry);
-        }
-    }
 
-    if (entry->state == READ_STATE_META) {
-        if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
-            if (entry->readMetaBuffer) {
-                free(entry->readMetaBuffer);
-                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d,
%d) \n", entry->fd,
-                       entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize);
+            entry->headerError = false;
+            if (entry->readHeaderBufferSize) {
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->header.header.payloadPartSize) {
+                char* buffer = entry->buffer;
+                msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset];
+                msg.msg_iov[msg.msg_iovlen].iov_len = entry->header.header.payloadPartSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->header.header.metadataSize) {
+                if (entry->header.header.metadataSize > entry->readMetaBufferSize)
{
+                    char *buffer = realloc(entry->readMetaBuffer, (size_t) entry->header.header.metadataSize);
+                    if (buffer) {
+                        entry->readMetaBuffer = buffer;
+                        entry->readMetaBufferSize = entry->header.header.metadataSize;
+                        L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer:
(%d, %d) \n", entry->fd,
+                               entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize);
+                    }
+                }
+                msg.msg_iov[msg.msg_iovlen].iov_base = entry->readMetaBuffer;
+                msg.msg_iov[msg.msg_iovlen].iov_len  = entry->header.header.metadataSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->readFooterSize) {
+                msg.msg_iov[msg.msg_iovlen].iov_base = entry->readFooterBuffer;
+                msg.msg_iov[msg.msg_iovlen].iov_len = entry->readFooterSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
             }
-            entry->readMetaBufferSize = entry->header.header.metadataSize + PADDING_BUFFER_SIZE;
-            entry->readMetaBuffer     = malloc((size_t) entry->readMetaBufferSize);
-        }
-
-        // Read meta data from (queue
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readMetaBuffer,0);
-        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
-            entry->msgSizeReadSize += nbytes;
-            pubsub_tcpHandler_setReadStateMachine(handle, entry);
         }
+        nbytes = recvmsg(fd, &msg,  MSG_NOSIGNAL);
     }
-    if (entry->state == READ_STATE_FOOTER) {
-        // Check for end of message using, footer of message. Because of streaming protocol
-        if (!entry->readFooterBuffer) entry->readFooterBuffer = malloc(entry->readFooterSize);
-        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readFooterBuffer,
0);
-        if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+    if ((nbytes >= msgSize)&&(!entry->headerError))  {
+        bool valid = true;
+        if (entry->readFooterSize) {
             if (handle->protocol->decodeFooter(handle->protocol->handle,
                                                entry->readFooterBuffer,
                                                entry->readFooterSize,
-                                               &entry->header) == CELIX_SUCCESS) {
-                // valid footer, this means that the message is valid
-                pubsub_tcpHandler_setReadStateMachine(handle, entry);
-            } else {
+                                               &entry->header) != CELIX_SUCCESS) {
+
                 // Did not receive correct footer
                 L_ERROR(
                     "[TCP Socket] Failed to decode message footer seq %d (received corrupt
message, transmit buffer full?) (fd: %d) (url: %s)",
                     entry->header.header.seqNr,
                     entry->fd,
                     entry->url);
-                pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
+                valid = false;
             }
         }
-    }
-    if (entry->state == READ_STATE_READY) {
-        // Complete message is received
-        pubsub_tcpHandler_decodePayload(handle, entry);
-        pubsub_tcpHandler_setReadStateMachine(handle, entry);
+        if (!entry->header.header.isLastSegment) {
+            // Not last Segment of message
+            valid = false;
+        }
+
+        if (valid) {
+            // Complete message is received
+            pubsub_tcpHandler_decodePayload(handle, entry);
+        }
     }
 
     if (nbytes > 0) {
@@ -1051,7 +1045,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
 // If the message is completely reassembled true is returned and the index and size have
valid values
 //
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
+int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
     celixThreadRwlock_writeLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *)
(intptr_t) fd);
     if (entry == NULL)
@@ -1280,11 +1274,11 @@ int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connectio
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message,
struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
     int nofConnToClose = 0;
     if (handle) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
         size_t max_msg_iov_len = IOV_MAX - 3; // header , footer, padding
         while (hashMapIterator_hasNext(&iter)) {
@@ -1294,6 +1288,10 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
             size_t payloadSize = 0;
             if (msg_iov_len == 1) {
                 handle->protocol->encodePayload(handle->protocol->handle, message,
&payloadData, &payloadSize);
+                //char* b = (char*)payloadData;
+                //for (int i = 0; i < payloadSize; i++) {
+                //    b[i] = '0' + (i % 10);
+                //}
             } else {
                 for (size_t i = 0; i < msg_iov_len; i++) {
                     payloadSize += msgIoVec[i].iov_len;
@@ -1371,7 +1369,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                         msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadSize),
entry->maxMsgSize);
                         msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                         message->header.payloadPartSize = msgPartSize;
-                        message->header.payloadOffset = msgPayloadSize;
+                        message->header.payloadOffset   = msgPayloadSize;
+                        //printf("Pay[%d: MsgId: %d,  Offset %d, %d (%d) url: %s\n ", message->header.seqNr,
message->header.msgId, (int)msgPayloadSize, (int)msgPartSize, (int)message->header.payloadSize,
entry->url);
                         msgPayloadSize += msg.msg_iov[msg.msg_iovlen].iov_len;
                         msgSize = msgPayloadSize;
                     } else {
@@ -1489,8 +1488,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message
                 }
             }
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock
to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1661,6 +1660,7 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                 L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
         }
         for (int i = 0; i < nof_events; i++) {
+            size_t size = hashMap_size(handle->interface_fd_map);
             hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map);
             psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
             while (hashMapIterator_hasNext(&iter)) {


Mime
View raw message