celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbul...@apache.org
Subject [celix] 08/11: Merge branch 'master' into feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
Date Tue, 06 Oct 2020 19:04:52 GMT
This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
in repository https://gitbox.apache.org/repos/asf/celix.git

commit a35435e8df3347151aa9db6b1cd2eb6731ff1a1c
Merge: dbf6bdc 3be06db
Author: Roy Bulter <roybulter@gmail.com>
AuthorDate: Mon Aug 3 12:43:47 2020 +0200

    Merge branch 'master' into feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2
    
    # Conflicts:
    #	bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c

 .github/workflows/coverity-scan.yml                |  57 +++
 CHANGES.md                                         |   4 +
 bundles/deployment_admin/README.md                 |   4 +
 bundles/device_access/README.md                    |   4 +
 .../device_access/example/base_driver/README.md    |   4 +
 .../example/consuming_driver/README.md             |   4 +
 .../example/refining_driver/README.md              |   4 +
 bundles/http_admin/README.md                       |   4 +
 bundles/http_admin/civetweb/src/civetweb.c         |   2 +-
 bundles/logging/README.md                          |   4 +
 bundles/logging/log_writers/README.md              |   4 +
 bundles/pubsub/README.md                           |   4 +
 bundles/pubsub/examples/keys/README.md             |   4 +
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c |  16 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 251 ++++--------
 .../src/pubsub_tcp_topic_receiver.c                |   3 +
 bundles/pubsub/pubsub_admin_udp_mc/README.md       |   4 +
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |  15 +-
 .../src/pubsub_udpmc_topic_receiver.c              |   5 +-
 .../src/pubsub_websocket_admin.c                   |  15 +-
 .../src/pubsub_websocket_topic_receiver.c          |   3 +
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |  35 +-
 .../src/pubsub_zmq_topic_receiver.c                |   7 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 118 +++---
 .../pubsub_protocol_lib/CMakeLists.txt             |   8 +-
 .../pubsub_protocol_lib/gtest/CMakeLists.txt}      |  15 +-
 .../gtest/src/PS_WP_common_tests.cc                |  57 +++
 .../pubsub_protocol_lib/gtest/src/main.cc          |  26 ++
 .../src/pubsub_wire_protocol_common.c              |  93 +++--
 bundles/pubsub/pubsub_spi/CMakeLists.txt           |   7 +-
 .../gtest/CMakeLists.txt}                          |  12 +-
 .../gtest/src/PubSubEndpointUtilsTestSuite.cc      |  47 +++
 .../pubsub/pubsub_spi/include/pubsub_endpoint.h    |  16 +-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |   3 +-
 .../pubsub/pubsub_spi/src/pubsub_endpoint_match.c  |  11 +
 .../src/pubsub_topology_manager.c                  | 452 +++++++++++++--------
 .../src/pubsub_topology_manager.h                  |  16 +-
 bundles/pubsub/test/CMakeLists.txt                 | 142 ++++++-
 .../{ping.properties => deadlock.scope.properties} |   6 +-
 ...{ping.properties => deadlock.scope2.properties} |   6 +-
 bundles/pubsub/test/meta_data/ping.properties      |   2 +-
 bundles/pubsub/test/meta_data/ping2.properties     |   2 +-
 bundles/pubsub/test/meta_data/pong2.properties     |   2 +-
 .../pubsub/test/pstm_deadlock_test/test_runner.cc  | 142 +++++++
 bundles/pubsub/test/test/test_endpoint_runner.cc   |   4 +-
 bundles/pubsub/test/test/test_runner.cc            |  92 ++---
 bundles/remote_services/README.md                  |   6 +-
 bundles/remote_services/discovery_etcd/README.md   |   4 +
 .../remote_service_admin_dfi/README.md             |   4 +
 .../remote_services/remote_services_api/README.md  |   4 +
 bundles/remote_services/rsa_spi/README.md          |   4 +
 bundles/remote_services/topology_manager/README.md |   4 +
 bundles/shell/remote_shell/README.md               |   4 +
 bundles/shell/shell/README.md                      |   4 +
 bundles/shell/shell_tui/README.md                  |   4 +
 bundles/shell/shell_wui/README.md                  |   4 +
 documents/building/README.md                       |   4 +
 documents/cmake_commands/README.md                 |   4 +
 documents/getting_started/README.md                |   4 +
 .../getting_started/creating_a_simple_bundle.md    |   6 +-
 documents/getting_started/using_services_with_c.md |   4 +
 .../getting_started/using_services_with_cxx.md     |   4 +
 documents/intro/README.md                          |   6 +-
 documents/subprojects/README.md                    |   6 +
 examples/celix-examples/README.md                  |   4 +
 examples/celix-examples/http_example/README.md     |   7 +-
 .../celix-examples/services_example_c/README.md    |   4 +
 libs/dependency_manager/README.md                  |   4 +
 libs/dependency_manager/TODO.md                    |   6 +-
 libs/dependency_manager_cxx/README.md              |   4 +
 libs/dependency_manager_cxx/TODO.md                |   6 +-
 libs/etcdlib/README.md                             |   4 +
 libs/framework/src/dm_dependency_manager_impl.c    |  44 +-
 libs/launcher/README.md                            |   4 +
 libs/utils/README.md                               |   4 +
 libs/utils/gtest/src/LogUtilsTestSuite.cc          |   2 -
 misc/experimental/README.md                        |   4 +
 misc/experimental/bundles/config_admin/README.md   |   4 +
 misc/experimental/promise/README.md                |   4 +
 79 files changed, 1351 insertions(+), 570 deletions(-)

diff --cc bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 20eb401,0cbd28f..4734497
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@@ -95,13 -86,9 +93,11 @@@ typedef struct psa_tcp_connection_entr
      void *footerBuffer;
      unsigned int bufferSize;
      void *buffer;
-     unsigned int bufferReadReadOffset;
-     unsigned int expectedBufferReadSize;
-     unsigned int msgSizeReadSize;
+     unsigned int bufferReadSize;
      unsigned int metaBufferSize;
      void *metaBuffer;
 +    struct msghdr msg;
 +    size_t msg_iovlen;        /* Number of elements in the vector.  */
      unsigned int retryCount;
  } psa_tcp_connection_entry_t;
  
@@@ -343,13 -338,6 +348,9 @@@ pubsub_tcpHandler_createEntry(pubsub_tc
          }
          if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
          if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
 +        entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
 +        entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize;
 +        entry->msg_iovlen++;
-         pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER);
      }
      return entry;
  }
@@@ -652,25 -628,16 +641,24 @@@ int pubsub_tcpHandler_listen(pubsub_tcp
  }
  
  //
 -// Setup buffer sizes
 +// Setup receive buffer size
  //
 -int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
 -                                               unsigned int maxNofBuffers
 -                                               __attribute__((__unused__)),
 -                                               unsigned int bufferSize) {
 +int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size)
{
      if (handle != NULL) {
          celixThreadRwlock_writeLock(&handle->dbLock);
 -        handle->bufferSize = bufferSize;
 -        handle->maxNofBuffer = maxNofBuffers;
 +        handle->bufferSize = size;
 +        celixThreadRwlock_unlock(&handle->dbLock);
 +    }
 +    return 0;
 +}
 +
 +//
 +// Setup receive buffer size
 +//
 +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
 +    if (handle != NULL) {
 +        celixThreadRwlock_writeLock(&handle->dbLock);
 +        handle->maxMsgSize = size;
-         handle->maxMsgSize = 4;
          celixThreadRwlock_unlock(&handle->dbLock);
      }
      return 0;
@@@ -809,11 -775,12 +795,12 @@@ static inlin
  void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry) {
  
    if (entry->header.header.payloadSize > 0) {
--    handle->protocol->decodePayload(handle->protocol->handle, entry->buffer,
entry->header.header.payloadSize, &entry->header);
++      handle->protocol->decodePayload(handle->protocol->handle, entry->buffer,
entry->header.header.payloadSize, &entry->header);
    }
    if (entry->header.header.metadataSize > 0) {
--    handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
++      handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
                                       entry->header.header.metadataSize, &entry->header);
 -    entry->metaBufferSize = entry->header.header.metadataSize;
++      entry->metaBufferSize = entry->header.header.metadataSize;
    }
    if (handle->processMessageCallback && entry->header.payload.payload != NULL
&& entry->header.payload.length) {
      struct timespec receiveTime;
@@@ -1173,136 -1048,113 +1077,135 @@@ int pubsub_tcpHandler_write(pubsub_tcpH
                  handle->protocol->encodeFooter(handle->protocol->handle, message,
                                                   &footerData,
                                                   &footerDataSize);
 -                entry->footerSize = footerDataSize;
              }
  
 +
              size_t msgSize = 0;
 -            struct msghdr msg;
 -            struct iovec msg_iov[IOV_MAX];
 -            memset(&msg, 0x00, sizeof(struct msghdr));
 -            msg.msg_name = &entry->addr;
 -            msg.msg_namelen = entry->len;
 -            msg.msg_flags = flags;
 -            msg.msg_iov = msg_iov;
 -
 -            // Write generic seralized payload in vector buffer
 -            if (payloadSize && payloadData) {
 -                msg.msg_iovlen++;
 -                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
 -                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
 -                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 -            } else {
 -                // copy serialized vector into vector buffer
 -                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
 +            size_t msgIovLen = 0;
 +            long int nbytes = UINT32_MAX;
 +            while (msgSize < totalMessageSize && nbytes > 0) {
 +                struct msghdr msg;
 +                struct iovec msg_iov[IOV_MAX];
 +                memset(&msg, 0x00, sizeof(struct msghdr));
 +                msg.msg_name = &entry->addr;
 +                msg.msg_namelen = entry->len;
 +                msg.msg_flags = flags;
 +                msg.msg_iov = msg_iov;
 +                size_t msgPartSize = 0;
 +                message->header.payloadPartSize = 0;
 +                message->header.payloadOffset = 0;
 +                message->header.metadataSize = 0;
 +                message->header.isLastSegment = 0;
 +
 +                // Write generic seralized payload in vector buffer
 +                if (payloadSize && payloadData) {
 +                    char *payloadDataBuffer = payloadData;
                      msg.msg_iovlen++;
 -                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
 -                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
 +                    msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize];
 +                    msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize);
 +                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 +                    message->header.payloadPartSize = msgPartSize;
 +                    message->header.payloadOffset = msgSize;
                      msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 +                } else {
 +                    // copy serialized vector into vector buffer
 +                    for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) {
 +                        msg.msg_iovlen++;
 +                        msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base;
 +                        msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len;
 +                        if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize)
 +                            break;
 +                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 +                    }
 +                    message->header.payloadPartSize = msgPartSize;
 +                    message->header.payloadOffset = msgSize;
 +                    msgSize += msgPartSize;
 +                    msgIovLen += (msg.msg_iovlen - 1);
                  }
 -            }
  
 -            // Write optional metadata in vector buffer
 -            if (metadataSize && metadataData) {
 -                msg.msg_iovlen++;
 -                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
 -                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
 -                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 -            }
 +                // Write optional metadata in vector buffer
 +                if ((msgSize < (payloadSize + metadataSize)) &&
 +                    (msgPartSize < entry->maxMsgSize) &&
 +                    (metadataSize && metadataData)) {
 +                    msg.msg_iovlen++;
 +                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
 +                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
 +                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 +                    message->header.metadataSize = metadataSize;
 +                    msgSize += metadataSize;
 +                }
 +                if (msgSize >= totalMessageSize) {
 +                    message->header.isLastSegment = 0x1;
 +                }
  
 -            // Write optional footerData in vector buffer
 -            if (footerData && footerDataSize) {
 -                msg.msg_iovlen++;
 -                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
 -                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
 -                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 -            }
 +                // Write optional footerData in vector buffer
 +                if (footerData && footerDataSize) {
 +                    msg.msg_iovlen++;
 +                    msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
 +                    msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
 +                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
 +                }
  
-                 void *headerData = NULL;
-                 size_t headerSize = 0;
-                 // check if header is not part of the payload (=> headerBufferSize =
0)s
-                 if (entry->headerBufferSize) {
-                     // Encode the header, with payload size and metadata size
-                     handle->protocol->encodeHeader(handle->protocol->handle,
message,
-                                                    &headerData,
-                                                    &headerSize);
-                 }
-                 if (!entry->headerBufferSize) {
-                     // Skip header buffer, when header is part of payload;
-                     msg.msg_iov = &msg_iov[1];
-                 } else if (headerSize && headerData) {
-                     // Write header in 1st vector buffer item
-                     msg.msg_iov[0].iov_base = headerData;
-                     msg.msg_iov[0].iov_len = headerSize;
-                     msgPartSize += msg.msg_iov[0].iov_len;
-                     msg.msg_iovlen++;
+             void *headerData = NULL;
+             size_t headerSize = 0;
+             // check if header is not part of the payload (=> headerBufferSize = 0)s
+             if (entry->headerBufferSize) {
 -              headerData = entry->headerBuffer;
+               // Encode the header, with payload size and metadata size
+               handle->protocol->encodeHeader(handle->protocol->handle, message,
+                                              &headerData,
+                                              &headerSize);
 -              entry->headerBufferSize = headerSize;
+             }
+             if (!entry->headerBufferSize) {
+               // Skip header buffer, when header is part of payload;
+               msg.msg_iov = &msg_iov[1];
+             } else if (headerSize && headerData) {
+               // Write header in 1st vector buffer item
+                 msg.msg_iov[0].iov_base = headerData;
+                 msg.msg_iov[0].iov_len = headerSize;
+                 msgSize += msg.msg_iov[0].iov_len;
+                 msg.msg_iovlen++;
+             } else {
+               L_ERROR("[TCP Socket] No header buffer is generated");
+               msg.msg_iovlen = 0;
+             }
+             long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize,
flags);
+             //  When a specific socket keeps reporting errors can indicate a subscriber
+             //  which is not active anymore, the connection will remain until the retry
+             //  counter exceeds the maximum retry count.
+             //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+             if (nbytes == -1) {
+                 if (entry->retryCount < handle->maxSendRetryCount) {
+                     entry->retryCount++;
+                     L_ERROR(
+                         "[TCP Socket] Failed to send message (fd: %d), error: %s. try again.
Retry count %u of %u, ",
+                         entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
                  } else {
-                     L_ERROR("[TCP Socket] No header buffer is generated");
-                     msg.msg_iovlen = 0;
-                 }
-                 nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize,
flags);
-                 //  When a specific socket keeps reporting errors can indicate a subscriber
-                 //  which is not active anymore, the connection will remain until the retry
-                 //  counter exceeds the maximum retry count.
-                 //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-                 if (nbytes == -1) {
-                     if (entry->retryCount < handle->maxSendRetryCount) {
-                         entry->retryCount++;
-                         L_ERROR(
-                             "[TCP Socket] Failed to send message (fd: %d), error: %s. try
again. Retry count %u of %u, ",
-                             entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
-                     } else {
-                         L_ERROR(
-                             "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
-                         connFdCloseQueue[nofConnToClose++] = entry->fd;
-                     }
-                     result = -1; //At least one connection failed sending
-                 } else if (msgPartSize) {
-                     entry->retryCount = 0;
-                     if (nbytes != msgPartSize) {
-                         L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes, strerror(errno));
-                     }
-                 }
-                 // Release data
-                 if (headerData) {
-                     free(headerData);
-                 }
-                 // Note: serialized Payload is deleted by serializer
-                 if (payloadData && (payloadData != message->payload.payload))
{
-                     free(payloadData);
+                     L_ERROR(
+                         "[TCP Socket] Failed to send message (fd: %d) after %u retries!
Closing connection... Error: %s",
+                         entry->fd, handle->maxSendRetryCount, strerror(errno));
+                     connFdCloseQueue[nofConnToClose++] = entry->fd;
                  }
-                 if (metadataData) {
-                     free(metadataData);
-                 }
-                 if (footerData) {
-                     free(footerData);
+                 result = -1; //At least one connection failed sending
+             } else if (msgSize) {
+                 entry->retryCount = 0;
+                 if (nbytes != msgSize) {
+                     L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n",
message->header.seqNr, msgSize, nbytes,  strerror(errno));
                  }
              }
- 
+             // Release data
 -            if (headerData && headerData != entry->headerBuffer) {
++            if (headerData) {
+                 free(headerData);
+             }
+             // Note: serialized Payload is deleted by serializer
+             if (payloadData && (payloadData != message->payload.payload)) {
+                 free(payloadData);
+             }
 -            if (metadataData && metadataData != entry->metaBuffer) {
++            if (metadataData) {
+                 free(metadataData);
+             }
 -            if (footerData && footerData != entry->footerBuffer) {
++            if (footerData) {
+                 free(footerData);
+             }
          }
      }
      celixThreadRwlock_unlock(&handle->dbLock);


Mime
View raw message