celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [celix] branch develop updated: Updated blocking for TCP pubsub admin (#55)
Date Sun, 22 Sep 2019 10:26:57 GMT
This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new e097f28  Updated blocking for TCP pubsub admin (#55)
e097f28 is described below

commit e097f282dd9beda894f9a67b42d25f548e57ca24
Author: rbulter <roybulter@gmail.com>
AuthorDate: Sun Sep 22 12:26:53 2019 +0200

    Updated blocking for TCP pubsub admin (#55)
---
 .../src/pubsub_psa_tcp_constants.h                 |   9 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 163 ++++++++++++---------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   4 +-
 .../src/pubsub_tcp_topic_receiver.c                |   2 +
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |   2 +
 5 files changed, 110 insertions(+), 70 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index d26ec10..d0c0ebd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -31,7 +31,8 @@
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
 #define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        1024
+
+#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000
 
 #define PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE        30
@@ -49,6 +50,12 @@
 #define PUBSUB_TCP_VERBOSE_KEY                  "PSA_TCP_VERBOSE"
 #define PUBSUB_TCP_VERBOSE_DEFAULT              true
 
+#define PUBSUB_TCP_PUBLISHER_BLOCKING_KEY       "PUBSUB_TCP_PUBLISHER_BLOCKING"
+#define PUBSUB_TCP_PUBLISHER_BLOCKING_DEFAULT   true
+
+#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY      "PUBSUB_TCP_SUBSCRIBER_BLOCKING"
+#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT   true
+
 #define PUBSUB_TCP_PSA_IP_KEY                   "PSA_IP"
 #define PUBSUB_TCP_PSA_ITF_KEY                  "PSA_INTERFACE"
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 82429b2..cfe3e31 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -49,6 +49,7 @@
 #define MAX_MSG_VECTOR_LEN 64
 #define MAX_DEFAULT_BUFFER_SIZE 4u
 
+
 #define READ_STATE_INIT   0u
 #define READ_STATE_HEADER 1u
 #define READ_STATE_DATA   2u
@@ -85,7 +86,8 @@ struct pubsub_tcpHandler {
   unsigned int msgIdOffset;
   unsigned int msgIdSize;
   bool bypassHeader;
-  bool useBlocking;
+  bool useBlockingWrite;
+  bool useBlockingRead;
   celix_thread_rwlock_t dbLock;
   unsigned int timeout;
   hash_map_t *url_map;
@@ -107,7 +109,6 @@ static inline int pubsub_tcpHandler_setInAddr(pubsub_tcpHandler_t *handle,
const
 static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t
*entry, bool lock);
 static inline int pubsub_tcpHandler_closeConnection(pubsub_tcpHandler_t *handle, int fd);
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-//static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd);
 static inline void pubsub_tcpHandler_setupEntry(psa_tcp_connection_entry_t* entry, int fd,
char *url, unsigned int bufferSize);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t* entry);
 
@@ -128,6 +129,7 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(log_helper_t *logHelper)
{
         handle->bypassHeader = false;
         handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
         handle->maxNofBuffer = 1; // Reserved for future Use;
+        handle->useBlockingWrite = true;
         pubsub_tcpHandler_setupEntry(&handle->own, -1, NULL, MAX_DEFAULT_BUFFER_SIZE);
         celixThreadRwlock_create(&handle->dbLock, 0);
         //signal(SIGPIPE, SIG_IGN);
@@ -271,7 +273,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url)
{
             }
         }
         // Make file descriptor NonBlocking
-        if ((!handle->useBlocking) && (rc >= 0)) {
+        if ((!handle->useBlockingRead) && (rc >= 0)) {
             rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
             if (rc < 0) close(fd);
         }
@@ -408,23 +410,6 @@ int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int
fd) {
   return rc;
 }
 
-#ifdef USE_BLOCKING
-static inline
-int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd) {
-    int rc = 0;
-    int flags = fcntl(fd, F_GETFL, 0);
-    if (flags == -1) rc = flags;
-    else {
-        rc = fcntl(fd, F_SETFL, flags & (~O_NONBLOCK));
-        if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot set to BLOCKING epoll: %s\n", strerror(errno));
-            errno = 0;
-        }
-    }
-    return rc;
-}
-#endif
-
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
     int fd = pubsub_tcpHandler_open(handle, url);
     // Make handler fd entry
@@ -507,7 +492,7 @@ void pubsub_tcpHandler_setUrlInfo(char *url, pubsub_tcpHandler_url_t *url_info)
             url_info->hostname = strtok(strdup(hostname), ":");
             if (port) {
                 port += 1;
-                unsigned int portDigits = (unsigned) atoi(port);
+                unsigned int portDigits = (unsigned int) atoi(port);
                 if (portDigits != 0) url_info->portnr = portDigits;
             }
             free(hostname);
@@ -537,6 +522,7 @@ int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
     return 0;
 }
 
+
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout) {
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
@@ -555,12 +541,30 @@ void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle,
bool bypassH
     }
 }
 
+void pubsub_tcpHandler_setBlockingWrite(pubsub_tcpHandler_t *handle, bool blocking) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->useBlockingWrite = blocking;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
+void pubsub_tcpHandler_setBlockingRead(pubsub_tcpHandler_t *handle, bool blocking) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->useBlockingRead = blocking;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+}
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it
in the internal structure
 // If the message is completely reassembled true is returned and the index and size have
valid values
 //
 int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index,
unsigned int *size) {
     celixThreadRwlock_writeLock(&handle->dbLock);
+    *index = 0;
+    *size = 0;
     psa_tcp_connection_entry_t *entry = NULL;
     if (fd == handle->own.fd) entry = &handle->own;
     else entry = hashMap_get(handle->fd_map, (void *) (intptr_t) fd);
@@ -582,6 +586,15 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int
fd, unsigne
             // First start looking for header
             entry->readState = READ_STATE_HEADER;
             entry->expectedReadSize = sizeof(pubsub_tcp_msg_header_t);
+            if (entry->expectedReadSize > entry->bufferSize) {
+                char* buffer = realloc(entry->buffer, (size_t )entry->expectedReadSize);
+                if (buffer) {
+                    entry->buffer = buffer;
+                    entry->bufferSize = entry->expectedReadSize;
+                    L_WARN("[TCP Socket: %d, url: %s,  realloc read buffer: (%d, %d) \n",
entry->fd, entry->url,
+                           entry->bufferSize, entry->expectedReadSize);
+                }
+            }
         } else {
             // When no header use Max buffer size
             entry->readState = READ_STATE_READY;
@@ -592,16 +605,15 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int
fd, unsigne
     int nbytes = recv(fd, &entry->buffer[entry->bufferReadSize], entry->expectedReadSize,
0);
     if (nbytes < 0) {
         // Handle Socket error, when nbytes == 0 => Connection is lost
-        if (nbytes < 0) {
-            L_ERROR("[TCP Socket] read error %s\n", strerror(errno));
-            errno = 0;
-        }
+        if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {}
+        else L_ERROR("[TCP Socket] read error %s\n", strerror(errno));
+        errno = 0;
     }
-    if ((!handle->bypassHeader)&&(nbytes>0)) {
+    if ((!handle->bypassHeader) && (nbytes > 0)) {
         // Update buffer administration
         entry->bufferReadSize += nbytes;
         entry->expectedReadSize -= nbytes;
-        // When expected data is read then update state
+        // When expected data is read then update states
         if (entry->expectedReadSize <= 0) {
             pubsub_tcp_msg_header_t *pHeader = (pubsub_tcp_msg_header_t *) entry->buffer;
             if (entry->readState == READ_STATE_FIND_HEADER) {
@@ -629,10 +641,14 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int
fd, unsigne
                 // Header is found, read the data from the socket, update state to READ_STATE_DATA
                 int buffer_size = pHeader->bufferSize + entry->bufferReadSize;
                 // When buffer is not big enough, reallocate buffer
-                if (buffer_size > entry->bufferSize) {
-                    entry->buffer = realloc(entry->buffer, buffer_size);
-                    entry->bufferSize = buffer_size;
-                    //L_WARN("[TCP Socket: %d, url: %s,  realloc read buffer: (%d, %d) \n",
entry->fd, entry->url, entry->bufferSize, buffer_size);
+                if ((buffer_size > entry->bufferSize) && (buffer_size)) {
+                    char* buffer = realloc(entry->buffer, (size_t )buffer_size);
+                    if (buffer) {
+                        entry->buffer = buffer;
+                        entry->bufferSize = buffer_size;
+                        L_WARN("[TCP Socket: %d, url: %s,  realloc read buffer: (%d, %d)
\n", entry->fd, entry->url,
+                               entry->bufferSize, buffer_size);
+                    }
                 }
                 // Set data read size
                 entry->expectedReadSize = pHeader->bufferSize;
@@ -641,7 +657,7 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd,
unsigne
             } else if (entry->readState == READ_STATE_DATA) {
                 handle->readSeqNr = pHeader->seqNr;
                 //fprintf(stdout, "ReadSeqNr: Count: %d\n", handle->readSeqNr);
-                nbytes -= sizeof(pubsub_tcp_msg_header_t);
+                nbytes = entry->bufferReadSize - sizeof(pubsub_tcp_msg_header_t);
                 if (nbytes == 0) {
                     errno = 0;
                 }
@@ -655,9 +671,18 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int
fd, unsigne
             }
         }
     }
-    *index = 0;
-    // if read state is not ready, don't process buffer
-    *size = nbytes;
+    if (nbytes > 0 && entry->readState == READ_STATE_READY) {
+        // if read state is not ready, don't process buffer
+        // Check if buffer size is correct
+        pubsub_tcp_msg_header_t *pHeader = (pubsub_tcp_msg_header_t *) entry->buffer;
+        if (nbytes != pHeader->bufferSize) {
+            L_ERROR( "[TCP Socket] Buffer size is not correct %d: %d!=%d  errno: %s",
+                handle->readSeqNr, nbytes, pHeader->bufferSize, strerror(errno));
+            entry->readState = READ_STATE_INIT;
+        } else {
+            *size = nbytes;
+        }
+    }
     celixThreadRwlock_unlock(&handle->dbLock);
     return nbytes;
 }
@@ -730,6 +755,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
     header->marker_end   = MARKER_END_PATTERN;
     header->bufferSize   = size;
     hash_map_iterator_t iter = hashMapIterator_construct(handle->fd_map);
+
     while (hashMapIterator_hasNext(&iter)) {
         psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
 
@@ -763,19 +789,17 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t
         //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
         if (nbytes == -1) {
             result = ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) ? 0 :
-1;
-            L_ERROR("[TCP Socket] Cannot send msg %s\n", strerror(errno));
+            L_ERROR("[TCP Socket] Seq_Id: %d Cannot send msg %s\n", header->seqNr, strerror(errno));
             errno = 0;
-            continue;
         }
         int msgSize = 0;
         for (int i = 0; i < msg.msg_iovlen; i++) {
             msgSize+=msg.msg_iov[i].iov_len;
         }
         if (nbytes != msgSize) {
-            L_ERROR("[TCP Socket] MsgSize not correct: %d != nBytess\n", msgSize, nbytes);
-            continue;
+            L_ERROR("[TCP Socket] Seq; %d, MsgSize not correct: %d != %d (BufferSize: %d
\n", header->seqNr, msgSize, nbytes, header->bufferSize);
         }
-        written += nbytes;
+        written = (result == 0) ? written + nbytes : written;
     }
     celixThreadRwlock_unlock(&handle->dbLock);
     return (result == 0 ? written : result);
@@ -792,8 +816,9 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
         int nof_events = 0;
         nof_events = epoll_wait(handle->efd, events, MAX_EPOLL_EVENTS, handle->timeout);
         if (nof_events < 0) {
-          L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
-          errno = 0;
+            if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {}
+            else L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno));
+            errno = 0;
         }
         for (int i = 0; i < nof_events; i++) {
             if ((handle->own.fd >= 0) && (events[i].data.fd == handle->own.fd))
{
@@ -808,7 +833,7 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                   errno = 0;
                 }
                 // Make file descriptor NonBlocking
-                if ((!handle->useBlocking) && (rc >= 0)) {
+                if ((!handle->useBlockingWrite) && (rc >= 0)) {
                     rc = pubsub_tcpHandler_makeNonBlocking(handle, fd);
                     if (rc < 0) pubsub_tcpHandler_freeEntry(&handle->own);
                 }
@@ -846,35 +871,38 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                 int count = 0;
                 bool isReading = true;
                 while(isReading) {
-                  unsigned int index = 0;
-                  unsigned int size = 0;
-                  isReading = (handle->useBlocking) ? false : isReading;
-                  count++;
-                  rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index,
&size);
-                  if (rc <= 0) {
+                    unsigned int index = 0;
+                    unsigned int size = 0;
+                    isReading = (handle->useBlockingRead) ? false : isReading;
+                    count++;
+                    rc = pubsub_tcpHandler_dataAvailable(handle, events[i].data.fd, &index,
&size);
+                    if (rc <= 0) {
                         // close connection.
                         if (rc == 0) {
                             pubsub_tcpHandler_closeConnection(handle, events[i].data.fd);
                         }
                         isReading = false;
-                        break;
-                  }
-                  // Handle data
-                  void *buffer = NULL;
-                  pubsub_tcp_msg_header_t *msgHeader = NULL;
-                  rc = pubsub_tcpHandler_read(handle, events[i].data.fd, index, &msgHeader,
&buffer, size);
-                    if (rc != 0) {
-                        isReading = false;
-                        break;
-                  }
-                  celixThreadRwlock_readLock(&handle->dbLock);
-                  if ((handle->processMessageCallback)&&(buffer)) {
-                    struct timespec receiveTime;
-                    clock_gettime(CLOCK_REALTIME, &receiveTime);
-                    handle->processMessageCallback(handle->processMessagePayload, msgHeader,
buffer, size, &receiveTime);
-                    isReading = false;
-                  }
-                  celixThreadRwlock_unlock(&handle->dbLock);
+                        continue;
+                    }
+                    if (size) {
+                        // Handle data
+                        void *buffer = NULL;
+                        pubsub_tcp_msg_header_t *msgHeader = NULL;
+                        rc = pubsub_tcpHandler_read(handle, events[i].data.fd, index, &msgHeader,
&buffer, size);
+                        if (rc < 0) {
+                            isReading = false;
+                            continue;
+                        }
+                        celixThreadRwlock_readLock(&handle->dbLock);
+                        if (handle->processMessageCallback) {
+                            struct timespec receiveTime;
+                            clock_gettime(CLOCK_REALTIME, &receiveTime);
+                            handle->processMessageCallback(handle->processMessagePayload,
msgHeader, buffer, size,
+                                                           &receiveTime);
+                            isReading = false;
+                        }
+                        celixThreadRwlock_unlock(&handle->dbLock);
+                    }
                 }
             } else if (events[i].events & EPOLLOUT) {
                 int err = 0;
@@ -898,7 +926,6 @@ int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) {
                       event.data.fd = events[i].data.fd;
                       // Register Modify epoll
                       rc = epoll_ctl(handle->efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
-                      //pubsub_tcpHandler_makeBlocking(handle, events[i].data.fd);
                       if (rc < 0) {
                           L_ERROR("[TCP Socket] Cannot create epoll\n");
                       }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index 48abe22..f1be060 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,8 +58,10 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char* url);
 int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, unsigned int
maxNofBuffers, unsigned int bufferSize);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
 void pubsub_tcpHandler_setBypassHeader(pubsub_tcpHandler_t *handle, bool bypassHeader, unsigned
int msgIdOffset, unsigned int msgIdSize);
+void pubsub_tcpHandler_setBlockingWrite(pubsub_tcpHandler_t *handle, bool blocking);
+void pubsub_tcpHandler_setBlockingRead(pubsub_tcpHandler_t *handle, bool blocking);
 
-  int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index,
unsigned int *size);
+int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index,
unsigned int *size);
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_tcp_msg_header_t**
header, void ** buffer, unsigned int size);
 int pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_tcp_msg_header_t* header,
void* buffer, unsigned int size, int flags);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 625200a..23af6c3 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -194,10 +194,12 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context
     }
 
     if (topicProperties != NULL) {
+        bool blocking     = celix_properties_getAsBool((celix_properties_t *) topicProperties,
PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY, PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT);
         bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties,
PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
         long msgIdOffset  = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET,
PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
         long msgIdSize    = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE,
  PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
         pubsub_tcpHandler_setBypassHeader(receiver->socketHandler, bypassHeader, (unsigned
int)msgIdOffset, (unsigned int)msgIdSize);
+        pubsub_tcpHandler_setBlockingRead(receiver->socketHandler, blocking);
     }
 
     psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index a0c245b..6795af1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -140,10 +140,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     }
     sender->metricsEnabled   = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
PSA_TCP_DEFAULT_METRICS_ENABLED);
     if (topicProperties != NULL) {
+        bool blocking     = celix_properties_getAsBool((celix_properties_t *) topicProperties,
PUBSUB_TCP_PUBLISHER_BLOCKING_KEY, PUBSUB_TCP_PUBLISHER_BLOCKING_DEFAULT);
         bool bypassHeader = celix_properties_getAsBool((celix_properties_t *) topicProperties,
PUBSUB_TCP_BYPASS_HEADER, PUBSUB_TCP_DEFAULT_BYPASS_HEADER);
         long msgIdOffset  = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_OFFSET,
PUBSUB_TCP_DEFAULT_MESSAGE_ID_OFFSET);
         long msgIdSize    = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_MESSAGE_ID_SIZE,
  PUBSUB_TCP_DEFAULT_MESSAGE_ID_SIZE);
         pubsub_tcpHandler_setBypassHeader(sender->socketHandler, bypassHeader, (unsigned
int)msgIdOffset, (unsigned int)msgIdSize);
+        pubsub_tcpHandler_setBlockingWrite(sender->socketHandler, blocking);
     }
     /* Check if it's a static endpoint */
     bool isEndPointTypeClient = false;


Mime
View raw message