celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [celix] branch develop updated: Fixes for UDP pubsub admin
Date Fri, 28 Jun 2019 15:48:15 GMT
This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new aaf87f9  Fixes for UDP pubsub admin
aaf87f9 is described below

commit aaf87f997282a11ad2c76912165d5c26f7d82586
Author: Roy Lenferink <lenferinkroy@gmail.com>
AuthorDate: Fri Jun 28 17:10:41 2019 +0200

    Fixes for UDP pubsub admin
---
 bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c | 550 ++++++++++-----------
 bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h |   7 +-
 .../pubsub/pubsub_admin_udp_mc/src/psa_activator.c | 136 ++---
 .../src/pubsub_psa_udpmc_constants.h               |  23 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |  21 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_common.c  |  13 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_common.h  |  15 +-
 .../src/pubsub_udpmc_topic_receiver.c              |  49 +-
 .../src/pubsub_udpmc_topic_receiver.h              |   5 +-
 .../src/pubsub_udpmc_topic_sender.c                |  53 +-
 10 files changed, 430 insertions(+), 442 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c
index dd0fbbd..6ed5620 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c
@@ -20,8 +20,8 @@
  * large_udp.c
  *
  *  \date       Mar 1, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
  */
 
 #include "large_udp.h"
@@ -34,34 +34,34 @@
 #include <array_list.h>
 #include <pthread.h>
 
-#define MAX_UDP_MSG_SIZE 65535   /* 2^16 -1 */
-#define IP_HEADER_SIZE  20
-#define UDP_HEADER_SIZE 8
-//#define MTU_SIZE    1500
-#define MTU_SIZE    8000
-#define MAX_MSG_VECTOR_LEN 64
+#define MAX_UDP_MSG_SIZE        65535 /* 2^16 -1 */
+#define IP_HEADER_SIZE          20
+#define UDP_HEADER_SIZE         8
+//#define MTU_SIZE                1500
+#define MTU_SIZE                8000
+#define MAX_MSG_VECTOR_LEN      64
 
 //#define NO_IP_FRAGMENTATION
 
 struct largeUdp {
-	unsigned int maxNrLists;
-	array_list_pt udpPartLists;
-	pthread_mutex_t dbLock;
+    unsigned int maxNrLists;
+    array_list_pt udpPartLists;
+    pthread_mutex_t dbLock;
 };
 
 typedef struct udpPartList {
-	unsigned int msg_ident;
-	unsigned int msg_size;
-	unsigned int nrPartsRemaining;
-	char *data;
+    unsigned int msg_ident;
+    unsigned int msg_size;
+    unsigned int nrPartsRemaining;
+    char *data;
 } *udpPartList_pt;
 
 
 typedef struct msg_part_header {
-	unsigned int msg_ident;
-	unsigned int total_msg_size;
-	unsigned int part_msg_size;
-	unsigned int offset;
+    unsigned int msg_ident;
+    unsigned int total_msg_size;
+    unsigned int part_msg_size;
+    unsigned int offset;
 } msg_part_header_t;
 
 #ifdef NO_IP_FRAGMENTATION
@@ -70,28 +70,23 @@ typedef struct msg_part_header {
 #define MAX_PART_SIZE   (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
 #endif
 
-typedef struct msg_part {
-	msg_part_header_t header;
-	char data[MAX_PART_SIZE];
-} msg_part_t;
-
 //
 // Create a handle
 //
 largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
 {
-	printf("## Creating large UDP\n");
-	largeUdp_pt handle = calloc(sizeof(*handle), 1);
-	if(handle != NULL) {
-		handle->maxNrLists = maxNrUdpReceptions;
-		if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
-			free(handle);
-			handle = NULL;
-		}
-		pthread_mutex_init(&handle->dbLock, 0);
-	}
-
-	return handle;
+    printf("## Creating large UDP\n");
+    largeUdp_pt handle = calloc(sizeof(*handle), 1);
+    if (handle != NULL) {
+        handle->maxNrLists = maxNrUdpReceptions;
+        if (arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
+            free(handle);
+            handle = NULL;
+        }
+        pthread_mutex_init(&handle->dbLock, 0);
+    }
+
+    return handle;
 }
 
 //
@@ -99,27 +94,27 @@ largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
 //
 void largeUdp_destroy(largeUdp_pt handle)
 {
-	printf("### Destroying large UDP\n");
-	if(handle != NULL) {
-		pthread_mutex_lock(&handle->dbLock);
-		int nrUdpLists = arrayList_size(handle->udpPartLists);
-		int i;
-		for(i=0; i < nrUdpLists; i++) {
-			udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
-			if(udpPartList) {
-				if(udpPartList->data) {
-					free(udpPartList->data);
-					udpPartList->data = NULL;
-				}
-				free(udpPartList);
-			}
-		}
-		arrayList_destroy(handle->udpPartLists);
-		handle->udpPartLists = NULL;
-		pthread_mutex_unlock(&handle->dbLock);
-		pthread_mutex_destroy(&handle->dbLock);
-		free(handle);
-	}
+    printf("### Destroying large UDP\n");
+    if (handle != NULL) {
+        pthread_mutex_lock(&handle->dbLock);
+        int nrUdpLists = arrayList_size(handle->udpPartLists);
+        int i;
+        for (i=0; i < nrUdpLists; i++) {
+            udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
+            if (udpPartList) {
+                if (udpPartList->data) {
+                    free(udpPartList->data);
+                    udpPartList->data = NULL;
+                }
+                free(udpPartList);
+            }
+        }
+        arrayList_destroy(handle->udpPartLists);
+        handle->udpPartLists = NULL;
+        pthread_mutex_unlock(&handle->dbLock);
+        pthread_mutex_destroy(&handle->dbLock);
+        free(handle);
+    }
 }
 
 //
@@ -127,72 +122,72 @@ void largeUdp_destroy(largeUdp_pt handle)
 //
 int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
 {
-	int n;
-	int result = 0;
-	msg_part_header_t header;
-
-	int written = 0;
-	header.msg_ident = (unsigned int)random();
-	header.total_msg_size = 0;
-	for(n = 0; n < len ;n++) {
-		header.total_msg_size += largeMsg_iovec[n].iov_len;
-	}
-	int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
-
-	struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
-	struct msghdr msg;
-	msg.msg_name = dest_addr;
-	msg.msg_namelen = addrlen;
-	msg.msg_flags = 0;
-	msg.msg_iov = msg_iovec;
-	msg.msg_iovlen = 2; // header and payload;
-	msg.msg_control = NULL;
-	msg.msg_controllen = 0;
-
-	msg.msg_iov[0].iov_base = &header;
-	msg.msg_iov[0].iov_len = sizeof(header);
-
-	for(n = 0; n < nr_buffers; n++) {
-
-		header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
-		header.offset = n * MAX_PART_SIZE;
-		int remainingOffset = header.offset;
-		int recvPart = 0;
-		// find the start of the part
-		while(remainingOffset > largeMsg_iovec[recvPart].iov_len) {
-			remainingOffset -= largeMsg_iovec[recvPart].iov_len;
-			recvPart++;
-		}
-		int remainingData = header.part_msg_size;
-		int sendPart = 1;
-		msg.msg_iovlen = 1;
-
-		// fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
-		while(remainingData > 0) {
-			int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
-			msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
-			msg.msg_iov[sendPart].iov_len = partLen;
-			remainingData -= partLen;
-			remainingOffset = 0;
-			sendPart++;
-			recvPart++;
-			msg.msg_iovlen++;
-		}
-		int tmp, tmptot;
-		for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
-			tmptot += msg.msg_iov[tmp].iov_len;
-		}
-
-		int w = sendmsg(fd, &msg, 0);
-		if(w == -1) {
-			perror("send()");
-			result =  -1;
-			break;
-		}
-		written += w;
-	}
-
-	return (result == 0 ? written : result);
+    int n;
+    int result = 0;
+    msg_part_header_t header;
+
+    int written = 0;
+    header.msg_ident = (unsigned int)random();
+    header.total_msg_size = 0;
+    for (n = 0; n < len ;n++) {
+        header.total_msg_size += largeMsg_iovec[n].iov_len;
+    }
+    int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
+
+    struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
+    struct msghdr msg;
+    msg.msg_name = dest_addr;
+    msg.msg_namelen = addrlen;
+    msg.msg_flags = 0;
+    msg.msg_iov = msg_iovec;
+    msg.msg_iovlen = 2; // header and payload;
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+
+    msg.msg_iov[0].iov_base = &header;
+    msg.msg_iov[0].iov_len = sizeof(header);
+
+    for (n = 0; n < nr_buffers; n++) {
+
+        header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
+        header.offset = n * MAX_PART_SIZE;
+        int remainingOffset = header.offset;
+        int recvPart = 0;
+        // find the start of the part
+        while (remainingOffset > largeMsg_iovec[recvPart].iov_len) {
+            remainingOffset -= largeMsg_iovec[recvPart].iov_len;
+            recvPart++;
+        }
+        int remainingData = header.part_msg_size;
+        int sendPart = 1;
+        msg.msg_iovlen = 1;
+
+        // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
+        while (remainingData > 0) {
+            int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
+            msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
+            msg.msg_iov[sendPart].iov_len = partLen;
+            remainingData -= partLen;
+            remainingOffset = 0;
+            sendPart++;
+            recvPart++;
+            msg.msg_iovlen++;
+        }
+        int tmp, tmptot;
+        for (tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
+            tmptot += msg.msg_iov[tmp].iov_len;
+        }
+
+        int w = sendmsg(fd, &msg, 0);
+        if (w == -1) {
+            perror("send()");
+            result =  -1;
+            break;
+        }
+        written += w;
+    }
+
+    return (result == 0 ? written : result);
 }
 
 //
@@ -200,46 +195,46 @@ int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, i
 //
 int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
 {
-	int n;
-	int nr_buffers = (count / MAX_PART_SIZE) + 1;
-	int result = 0;
-	msg_part_header_t header;
-
-	int written = 0;
-	header.msg_ident = (unsigned int)random();
-	header.total_msg_size = count;
-	char *databuf = buf;
-
-	struct iovec msg_iovec[2];
-	struct msghdr msg;
-	msg.msg_name = dest_addr;
-	msg.msg_namelen = addrlen;
-	msg.msg_flags = 0;
-	msg.msg_iov = msg_iovec;
-	msg.msg_iovlen = 2; // header and payload;
-	msg.msg_control = NULL;
-	msg.msg_controllen = 0;
-
-	msg.msg_iov[0].iov_base = &header;
-	msg.msg_iov[0].iov_len = sizeof(header);
-
-	for(n = 0; n < nr_buffers; n++) {
-
-		header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
-		header.offset = n * MAX_PART_SIZE;
-		msg.msg_iov[1].iov_base = &databuf[header.offset];
-		msg.msg_iov[1].iov_len = header.part_msg_size;
-		int w = sendmsg(fd, &msg, 0);
-		if(w == -1) {
-			perror("send()");
-			result =  -1;
-			break;
-		}
-		written += w;
-		//usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
-	}
-
-	return (result == 0 ? written : result);
+    int n;
+    int nr_buffers = (count / MAX_PART_SIZE) + 1;
+    int result = 0;
+    msg_part_header_t header;
+
+    int written = 0;
+    header.msg_ident = (unsigned int)random();
+    header.total_msg_size = count;
+    char *databuf = buf;
+
+    struct iovec msg_iovec[2];
+    struct msghdr msg;
+    msg.msg_name = dest_addr;
+    msg.msg_namelen = addrlen;
+    msg.msg_flags = 0;
+    msg.msg_iov = msg_iovec;
+    msg.msg_iovlen = 2; // header and payload;
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+
+    msg.msg_iov[0].iov_base = &header;
+    msg.msg_iov[0].iov_len = sizeof(header);
+
+    for (n = 0; n < nr_buffers; n++) {
+
+        header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
+        header.offset = n * MAX_PART_SIZE;
+        msg.msg_iov[1].iov_base = &databuf[header.offset];
+        msg.msg_iov[1].iov_len = header.part_msg_size;
+        int w = sendmsg(fd, &msg, 0);
+        if (w == -1) {
+            perror("send()");
+            result =  -1;
+            break;
+        }
+        written += w;
+        //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
+    }
+
+    return (result == 0 ? written : result);
 }
 
 //
@@ -247,108 +242,107 @@ int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int fla
 // If the message is completely reassembled true is returned and the index and size have valid values
 //
 bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) {
-	msg_part_header_t header;
-	int result = false;
-	// Only read the header, we don't know yet where to store the payload
-	if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
-		perror("read()");
-		return false;
-	}
-
-	struct iovec msg_vec[2];
-	struct msghdr msg;
-	msg.msg_name = NULL;
-	msg.msg_namelen = 0;
-	msg.msg_flags = 0;
-	msg.msg_iov = msg_vec;
-	msg.msg_iovlen = 2; // header and payload;
-	msg.msg_control = NULL;
-	msg.msg_controllen = 0;
-
-	msg.msg_iov[0].iov_base = &header;
-	msg.msg_iov[0].iov_len = sizeof(header);
-
-	pthread_mutex_lock(&handle->dbLock);
-
-	int nrUdpLists = arrayList_size(handle->udpPartLists);
-	int i;
-	bool found = false;
-	for(i = 0; i < nrUdpLists; i++) {
-		udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
-		if(udpPartList->msg_ident == header.msg_ident) {
-			found = true;
-
-			//sanity check
-			if(udpPartList->msg_size != header.total_msg_size) {
-				// Corruption occurred. Remove the existing administration and build up a new one.
-				arrayList_remove(handle->udpPartLists, i);
-				free(udpPartList->data);
-				free(udpPartList);
-				found = false;
-				break;
-			}
-
-			msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
-			msg.msg_iov[1].iov_len = header.part_msg_size;
-			if(recvmsg(fd, &msg, 0)<0){
-				found=true;
-				result=false;
-				break;
-			}
-
-			udpPartList->nrPartsRemaining--;
-			if(udpPartList->nrPartsRemaining == 0) {
-				*index = i;
-				*size = udpPartList->msg_size;
-				result = true;
-				break;
-			} else {
-				result = false; // not complete
-				break;
-			}
-		}
-	}
-
-	if(found == false) {
-		udpPartList_pt udpPartList = NULL;
-		if(nrUdpLists == handle->maxNrLists) {
-			// remove list at index 0
-			udpPartList = arrayList_remove(handle->udpPartLists, 0);
-			fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
-			free(udpPartList->data);
-			free(udpPartList);
-			nrUdpLists--;
-		}
-		udpPartList = calloc(sizeof(*udpPartList), 1);
-		udpPartList->msg_ident =  header.msg_ident;
-		udpPartList->msg_size =  header.total_msg_size;
-		udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
-		udpPartList->data = calloc(sizeof(char), header.total_msg_size);
-
-		msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
-		msg.msg_iov[1].iov_len = header.part_msg_size;
-		if(recvmsg(fd, &msg, 0)<0){
-			free(udpPartList->data);
-			free(udpPartList);
-			result=false;
-		}
-		else{
-				arrayList_add(handle->udpPartLists, udpPartList);
-
-			if(udpPartList->nrPartsRemaining == 0) {
-				*index = nrUdpLists;
-				*size = udpPartList->msg_size;
-				result = true;
-			} else {
-				result = false;
-			}
-		}
-
-	}
-
-	pthread_mutex_unlock(&handle->dbLock);
-
-	return result;
+    msg_part_header_t header;
+    int result = false;
+    // Only read the header, we don't know yet where to store the payload
+    if (recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
+        perror("read()");
+        return false;
+    }
+
+    struct iovec msg_vec[2];
+    struct msghdr msg;
+    msg.msg_name = NULL;
+    msg.msg_namelen = 0;
+    msg.msg_flags = 0;
+    msg.msg_iov = msg_vec;
+    msg.msg_iovlen = 2; // header and payload;
+    msg.msg_control = NULL;
+    msg.msg_controllen = 0;
+
+    msg.msg_iov[0].iov_base = &header;
+    msg.msg_iov[0].iov_len = sizeof(header);
+
+    pthread_mutex_lock(&handle->dbLock);
+
+    int nrUdpLists = arrayList_size(handle->udpPartLists);
+    int i;
+    bool found = false;
+    for (i = 0; i < nrUdpLists; i++) {
+        udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
+        if (udpPartList->msg_ident == header.msg_ident) {
+            found = true;
+
+            //sanity check
+            if (udpPartList->msg_size != header.total_msg_size) {
+                // Corruption occurred. Remove the existing administration and build up a new one.
+                arrayList_remove(handle->udpPartLists, i);
+                free(udpPartList->data);
+                free(udpPartList);
+                found = false;
+                break;
+            }
+
+            msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+            msg.msg_iov[1].iov_len = header.part_msg_size;
+            if (recvmsg(fd, &msg, 0)<0) {
+                found=true;
+                result=false;
+                break;
+            }
+
+            udpPartList->nrPartsRemaining--;
+            if (udpPartList->nrPartsRemaining == 0) {
+                *index = i;
+                *size = udpPartList->msg_size;
+                result = true;
+                break;
+            } else {
+                result = false; // not complete
+                break;
+            }
+        }
+    }
+
+    if (found == false) {
+        udpPartList_pt udpPartList = NULL;
+        if (nrUdpLists == handle->maxNrLists) {
+            // remove list at index 0
+            udpPartList = arrayList_remove(handle->udpPartLists, 0);
+            fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
+            free(udpPartList->data);
+            free(udpPartList);
+            nrUdpLists--;
+        }
+        udpPartList = calloc(sizeof(*udpPartList), 1);
+        udpPartList->msg_ident =  header.msg_ident;
+        udpPartList->msg_size =  header.total_msg_size;
+        udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
+        udpPartList->data = calloc(sizeof(char), header.total_msg_size);
+
+        msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+        msg.msg_iov[1].iov_len = header.part_msg_size;
+        if (recvmsg(fd, &msg, 0)<0) {
+            free(udpPartList->data);
+            free(udpPartList);
+            result=false;
+        } else {
+            arrayList_add(handle->udpPartLists, udpPartList);
+
+            if (udpPartList->nrPartsRemaining == 0) {
+                *index = nrUdpLists;
+                *size = udpPartList->msg_size;
+                result = true;
+            } else {
+                result = false;
+            }
+        }
+
+    }
+
+    pthread_mutex_unlock(&handle->dbLock);
+
+    return result;
 }
 
 //
@@ -356,17 +350,17 @@ bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, uns
 //
 int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size)
 {
-	int result = 0;
-	pthread_mutex_lock(&handle->dbLock);
-
-	udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
-	if(udpPartList) {
-		*buffer = udpPartList->data;
-		free(udpPartList);
-	} else {
-		result = -1;
-	}
-	pthread_mutex_unlock(&handle->dbLock);
-
-	return result;
+    int result = 0;
+    pthread_mutex_lock(&handle->dbLock);
+
+    udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
+    if (udpPartList) {
+        *buffer = udpPartList->data;
+        free(udpPartList);
+    } else {
+        result = -1;
+    }
+    pthread_mutex_unlock(&handle->dbLock);
+
+    return result;
 }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h
index a21e654..2fcaa76 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h
@@ -20,19 +20,20 @@
  * large_udp.h
  *
  *  \date       Mar 1, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
  */
 
 #ifndef _LARGE_UDP_H_
 #define _LARGE_UDP_H_
+
 #include <stdbool.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 
-typedef struct largeUdp  *largeUdp_pt;
+typedef struct largeUdp *largeUdp_pt;
 
 largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions);
 void largeUdp_destroy(largeUdp_pt handle);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index 63de809..f855c04 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -29,86 +29,86 @@
 #include "../../../shell/shell/include/command.h"
 
 typedef struct psa_udpmc_activator {
-	log_helper_t *logHelper;
+    log_helper_t *logHelper;
 
-	pubsub_udpmc_admin_t *admin;
+    pubsub_udpmc_admin_t *admin;
 
-	long serializersTrackerId;
+    long serializersTrackerId;
 
-	pubsub_admin_service_t adminService;
-	long adminSvcId;
+    pubsub_admin_service_t adminService;
+    long adminSvcId;
 
-	command_service_t cmdSvc;
-	long cmdSvcId;
+    command_service_t cmdSvc;
+    long cmdSvcId;
 } psa_udpmc_activator_t;
 
 int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
-	act->adminSvcId = -1L;
-	act->cmdSvcId = -1L;
-	act->serializersTrackerId = -1L;
-
-
-	logHelper_create(ctx, &act->logHelper);
-	logHelper_start(act->logHelper);
-
-	act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper);
-	celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
-
-	//track serializers
-	if (status == CELIX_SUCCESS) {
-		celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-		opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
-		opts.filter.ignoreServiceLanguage = true;
-		opts.callbackHandle = act->admin;
-		opts.addWithProperties = pubsub_udpmcAdmin_addSerializerSvc;
-		opts.removeWithProperties = pubsub_udpmcAdmin_removeSerializerSvc;
-		act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-	}
-
-	//register pubsub admin service
-	if (status == CELIX_SUCCESS) {
-		pubsub_admin_service_t *psaSvc = &act->adminService;
-		psaSvc->handle = act->admin;
-		psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
-		psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
-		psaSvc->matchDiscoveredEndpoint = pubsub_udpmcAdmin_matchEndpoint;
-		psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
-		psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
-		psaSvc->setupTopicReceiver = pubsub_udpmcAdmin_setupTopicReceiver;
-		psaSvc->teardownTopicReceiver = pubsub_udpmcAdmin_teardownTopicReceiver;
-		psaSvc->addDiscoveredEndpoint = pubsub_udpmcAdmin_addEndpoint;
-		psaSvc->removeDiscoveredEndpoint = pubsub_udpmcAdmin_removeEndpoint;
-
-		celix_properties_t *props = celix_properties_create();
-		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);
-
-		act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
-	}
-
-	//register shell command service
-	{
-		act->cmdSvc.handle = act->admin;
-		act->cmdSvc.executeCommand = pubsub_udpmcAdmin_executeCommand;
-		celix_properties_t *props = celix_properties_create();
-		celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc");
-		celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc");
-		celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA");
-		act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
-	}
-
-	return status;
+    act->adminSvcId = -1L;
+    act->cmdSvcId = -1L;
+    act->serializersTrackerId = -1L;
+
+
+    logHelper_create(ctx, &act->logHelper);
+    logHelper_start(act->logHelper);
+
+    act->admin = pubsub_udpmcAdmin_create(ctx, act->logHelper);
+    celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
+
+    //track serializers
+    if (status == CELIX_SUCCESS) {
+        celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.callbackHandle = act->admin;
+        opts.addWithProperties = pubsub_udpmcAdmin_addSerializerSvc;
+        opts.removeWithProperties = pubsub_udpmcAdmin_removeSerializerSvc;
+        act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
+
+    //register pubsub admin service
+    if (status == CELIX_SUCCESS) {
+        pubsub_admin_service_t *psaSvc = &act->adminService;
+        psaSvc->handle = act->admin;
+        psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
+        psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
+        psaSvc->matchDiscoveredEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+        psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
+        psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
+        psaSvc->setupTopicReceiver = pubsub_udpmcAdmin_setupTopicReceiver;
+        psaSvc->teardownTopicReceiver = pubsub_udpmcAdmin_teardownTopicReceiver;
+        psaSvc->addDiscoveredEndpoint = pubsub_udpmcAdmin_addEndpoint;
+        psaSvc->removeDiscoveredEndpoint = pubsub_udpmcAdmin_removeEndpoint;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);
+
+        act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
+    }
+
+    //register shell command service
+    {
+        act->cmdSvc.handle = act->admin;
+        act->cmdSvc.executeCommand = pubsub_udpmcAdmin_executeCommand;
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, OSGI_SHELL_COMMAND_NAME, "psa_udpmc");
+        celix_properties_set(props, OSGI_SHELL_COMMAND_USAGE, "psa_udpmc");
+        celix_properties_set(props, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the UDPMC PSA");
+        act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, props);
+    }
+
+    return status;
 }
 
 int psa_udpmc_stop(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
-	celix_bundleContext_unregisterService(ctx, act->adminSvcId);
-	celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
-	celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
-	pubsub_udpmcAdmin_destroy(act->admin);
+    celix_bundleContext_unregisterService(ctx, act->adminSvcId);
+    celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
+    celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
+    pubsub_udpmcAdmin_destroy(act->admin);
 
-	logHelper_stop(act->logHelper);
-	logHelper_destroy(&act->logHelper);
+    logHelper_stop(act->logHelper);
+    logHelper_destroy(&act->logHelper);
 
-	return CELIX_SUCCESS;
+    return CELIX_SUCCESS;
 }
 
 CELIX_GEN_BUNDLE_ACTIVATOR(psa_udpmc_activator_t, psa_udpmc_start, psa_udpmc_stop);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
index 2d3df2d..7aa5304 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
@@ -21,27 +21,28 @@
 #define PUBSUB_PSA_UDPMC_CONSTANTS_H_
 
 
-#define PSA_UDPMC_PUBSUB_ADMIN_TYPE	            "udp_mc"
+#define PSA_UDPMC_PUBSUB_ADMIN_TYPE                 "udp_mc"
 
-#define PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE 		70
-#define PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE 	30
-#define PSA_UDPMC_DEFAULT_SCORE 				50
+#define PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE          70
+#define PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE         30
+#define PSA_UDPMC_DEFAULT_SCORE                     50
 
-#define PSA_UDPMC_QOS_SAMPLE_SCORE_KEY 			"PSA_UDPMC_QOS_SAMPLE_SCORE"
-#define PSA_UDPMC_QOS_CONTROL_SCORE_KEY 		"PSA_UDPMC_QOS_CONTROL_SCORE"
-#define PSA_UDPMC_DEFAULT_SCORE_KEY 			"PSA_UDPMC_DEFAULT_SCORE"
+#define PSA_UDPMC_QOS_SAMPLE_SCORE_KEY              "PSA_UDPMC_QOS_SAMPLE_SCORE"
+#define PSA_UDPMC_QOS_CONTROL_SCORE_KEY             "PSA_UDPMC_QOS_CONTROL_SCORE"
+#define PSA_UDPMC_DEFAULT_SCORE_KEY                 "PSA_UDPMC_DEFAULT_SCORE"
 
 
 #define PUBSUB_UDPMC_ADMIN_TYPE                     "udp_mc"
-#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY			"udpmc.socket_address"
-#define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY            "udpmc.socker_port"
+#define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY             "udpmc.socket_address"
+#define PUBSUB_UDPMC_SOCKET_PORT_KEY                "udpmc.socket_port"
 
-#define PUBSUB_UDPMC_IP_KEY 	                    "PSA_IP"
-#define PUBSUB_UDPMC_ITF_KEY	                    "PSA_INTERFACE"
+#define PUBSUB_UDPMC_IP_KEY                         "PSA_IP"
+#define PUBSUB_UDPMC_ITF_KEY                        "PSA_INTERFACE"
 #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY        "PSA_MC_PREFIX"
 #define PUBSUB_UDPMC_VERBOSE_KEY                    "PSA_UDPMC_VERBOSE"
 
 #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
+#define PUBSUB_UDPMC_MULTICAST_IP_DEFAULT           "224.100.1.1"
 #define PUBSUB_UDPMC_VERBOSE_DEFAULT                true
 
 /**
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 03d882b..1614657 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -33,16 +33,13 @@
 #include "pubsub_udpmc_topic_sender.h"
 #include "pubsub_udpmc_topic_receiver.h"
 
-#define PUBSUB_UDPMC_MC_IP_DEFAULT                     "224.100.1.1"
-#define PUBSUB_UDPMC_SOCKET_ADDRESS_KEY                "udpmc.socket_address"
-#define PUBSUB_UDPMC_SOCKET_PORT_KEY                   "udpmc.socket_port"
 
 #define L_DEBUG(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 #define L_INFO(...) \
-    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__);
+    logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
 #define L_WARN(...) \
-    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__);
+    logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 #define L_ERROR(...) \
     logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
@@ -105,7 +102,7 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
     int sendSocket = -1;
 
     const char *mcIpProp = celix_bundleContext_getProperty(ctx, PUBSUB_UDPMC_IP_KEY, NULL);
-    if(mcIpProp != NULL) {
+    if (mcIpProp != NULL) {
         if (strchr(mcIpProp, '/') != NULL) {
             // IP with subnet prefix specified
             char *found_if_ip = calloc(16, sizeof(char));
@@ -178,7 +175,7 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
     if (mc_ip != NULL) {
         psa->mcIpAddress = mc_ip;
     } else {
-        psa->mcIpAddress = strdup(PUBSUB_UDPMC_MC_IP_DEFAULT);
+        psa->mcIpAddress = strdup(PUBSUB_UDPMC_MULTICAST_IP_DEFAULT);
     }
     if (psa->verbose) {
         L_INFO("[PSA_UDPMC] Using %s for service annunciation", psa->mcIpAddress);
@@ -459,8 +456,8 @@ static celix_status_t pubsub_udpmcAdmin_connectEndpointToReceiver(pubsub_udpmc_a
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-    const char *sockAddress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
-    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
+    const char *sockAddress = celix_properties_get(endpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, -1L);
 
     bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
 
@@ -513,11 +510,11 @@ static celix_status_t pubsub_udpmcAdmin_disconnectEndpointFromReceiver(pubsub_ud
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
     const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
     const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-    const char *sockAdress = celix_properties_get(endpoint, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, NULL);
-    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY, -1L);
+    const char *sockAdress = celix_properties_get(endpoint, PUBSUB_UDPMC_SOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, PUBSUB_UDPMC_SOCKET_PORT_KEY, -1L);
 
     if (type == NULL || sockAdress == NULL || sockPort < 0) {
-        fprintf(stderr, "[PSA UPDMC] Error got endpoint without udpmc socket address/port or endpoint type\n");
+        L_WARN("[PSA UPDMC] Error disconnecting from endpoint without udpmc socket address/port or endpoint type.");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
         if (eScope != NULL && eTopic != NULL && type != NULL &&
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
index c0a5cc1..0a75ddd 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
@@ -24,15 +24,16 @@ int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), con
     return 0;
 }
 
-bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_t *msg) {
-    bool check=false;
-    int major=0,minor=0;
+bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t *hdr) {
+    bool check = false;
 
-    if(msgVersion!=NULL){
+    if (msgVersion != NULL) {
+        int major = 0, minor = 0;
         version_getMajor(msgVersion,&major);
         version_getMinor(msgVersion,&minor);
-        if(msg->major==((unsigned char)major)){ /* Different major means incompatible */
-            check = (msg->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+
+        if (hdr->major == ((unsigned char) major)) { /* Different major means incompatible */
+            check = (hdr->minor >= ((unsigned char) minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
         }
     }
 
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
index 481a316..461efb9 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
@@ -24,17 +24,16 @@
 
 #include "version.h"
 
-typedef struct pubsub_udp_msg {
-    int typeid;
-    char major;
-    char minor;
-    unsigned int payloadSize;
-    unsigned char *payload;
-} pubsub_udp_msg_t;
+typedef struct pubsub_udp_msg_header {
+    unsigned int type;
+    unsigned char major;
+    unsigned char minor;
+} pubsub_udp_msg_header_t;
+
 
 int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId);
 
-bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_t *msg);
+bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t *hdr);
 
 
 #endif //CELIX_PUBSUB_UDPMC_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index cd3da24..209b8d1 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -95,13 +95,11 @@ typedef struct psa_udpmc_subscriber_entry {
     bool initialized; //true if the init function is called through the receive thread
 } psa_udpmc_subscriber_entry_t;
 
-
-typedef struct pubsub_msg{
-    int typeid;
-    char major;
-    char minor;
+typedef struct pubsub_udp_msg {
+    pubsub_udp_msg_header_t header;
     unsigned int payloadSize;
-} pubsub_msg_t;
+    char payload[];
+} pubsub_udp_msg_t;
 
 static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void pubsub_udpmcTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
@@ -261,10 +259,7 @@ long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *re
     return receiver->serializerSvcId;
 }
 
-void pubsub_udpmcTopicReceiver_connectTo(
-        pubsub_udpmc_topic_receiver_t *receiver,
-        const char *socketAddress,
-        long socketPort) {
+void pubsub_udpmcTopicReceiver_connectTo(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort) {
     printf("[PSA UDPMC] TopicReceiver %s/%s connect to socket address = %s:%li\n", receiver->scope, receiver->topic, socketAddress, socketPort);
 
     char *key = NULL;
@@ -397,13 +392,13 @@ static void* psa_udpmc_recvThread(void * data) {
 
         int nfds = epoll_wait(receiver->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
         int i;
-        for(i = 0; i < nfds; i++ ) {
+        for (i = 0; i < nfds; i++ ) {
             unsigned int index;
             unsigned int size;
-            if(largeUdp_dataAvailable(receiver->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
+            if (largeUdp_dataAvailable(receiver->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
                 // Handle data
                 pubsub_udp_msg_t *udpMsg = NULL;
-                if(largeUdp_read(receiver->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
+                if (largeUdp_read(receiver->largeUdpHandle, index, (void**) &udpMsg, size) != 0) {
                     printf("[PSA_UDPMC]: ERROR largeUdp_read with index %d\n", index);
                     continue;
                 }
@@ -438,38 +433,35 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
 
         pubsub_msg_serializer_t *msgSer = NULL;
         if (entry->msgTypes != NULL) {
-            msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->typeid);
+            msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) msg->header.type);
         }
         if (msgSer == NULL) {
-            printf("[PSA_UDPMC] Serializer not available for message %d.\n",msg->typeid);
-        } else{
+            printf("[PSA_UDPMC] Serializer not available for message %d.\n", msg->header.type);
+        } else {
             void *msgInst = NULL;
-            bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, msg);
-
-            if(validVersion){
+            bool validVersion = psa_udpmc_checkVersion(msgSer->msgVersion, &msg->header);
 
+            if (validVersion) {
                 celix_status_t status = msgSer->deserialize(msgSer->handle, (const void *)msg->payload, 0, &msgInst);
 
                 if (status == CELIX_SUCCESS) {
                     bool release = true;
                     pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->typeid, msgInst, &release);
+                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
 
-                    if(release){
+                    if (release) {
                         msgSer->freeMsg(msgSer->handle, msgInst);
                     }
-                }
-                else{
+                } else {
                     printf("[PSA_UDPMC] Cannot deserialize msgType %s.\n",msgSer->msgName);
                 }
 
-            }
-            else{
-                int major=0,minor=0;
+            } else {
+                int major = 0, minor = 0;
                 version_getMajor(msgSer->msgVersion,&major);
                 version_getMinor(msgSer->msgVersion,&minor);
                 printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
-                       msgSer->msgName,major,minor,msg->major, msg->minor);
+                       msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
             }
 
         }
@@ -535,10 +527,11 @@ static void psa_udpmc_connectToAllRequestedConnections(pubsub_udpmc_topic_receiv
         hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_udpmc_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (!entry->connected){
+            if (!entry->connected) {
                 if (psa_udpmc_connectToEntry(receiver, entry)) {
                     entry->connected = true;
                 } else {
+                    L_WARN("[PSA_UDPMC_TR] Error connecting to address %s. (%s)", entry->socketAddress, strerror(errno));
                     allConnected = false;
                 }
             }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
index 7eab09a..1adc8a4 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.h
@@ -42,10 +42,7 @@ void pubsub_udpmcTopicReceiver_listConnections(pubsub_udpmc_topic_receiver_t *re
 
 long pubsub_udpmcTopicReceiver_serializerSvcId(pubsub_udpmc_topic_receiver_t *receiver);
 
-void pubsub_udpmcTopicReceiver_connectTo(
-        pubsub_udpmc_topic_receiver_t *receiver,
-        const char *socketAddress,
-        long socketPort);
+void pubsub_udpmcTopicReceiver_connectTo(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
 void pubsub_udpmcTopicReceiver_disconnectFrom(pubsub_udpmc_topic_receiver_t *receiver, const char *socketAddress, long socketPort);
 
 #endif //CELIX_PUBSUB_UDPMC_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 8d1f61e..22d1efb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -30,11 +30,11 @@
 #include "large_udp.h"
 #include "pubsub_udpmc_common.h"
 
-#define FIRST_SEND_DELAY_IN_SECONDS             2
+#define FIRST_SEND_DELAY_IN_SECONDS     2
 
 //TODO make configurable
-#define UDP_BASE_PORT	                        49152
-#define UDP_MAX_PORT	                        65000
+#define UDP_BASE_PORT                   49152
+#define UDP_MAX_PORT                    65000
 
 
 struct pubsub_udpmc_topic_sender {
@@ -70,6 +70,12 @@ typedef struct psa_udpmc_bounded_service_entry {
     largeUdp_pt largeUdpHandle;
 } psa_udpmc_bounded_service_entry_t;
 
+typedef struct pubsub_msg {
+    pubsub_udp_msg_header_t *header;
+    unsigned int payloadSize;
+    char *payload;
+} pubsub_udp_msg_t;
+
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
@@ -254,50 +260,49 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
     }
 
     if (msgSer != NULL) {
-        int major=0, minor=0;
-
         void* serializedOutput = NULL;
         size_t serializedOutputLen = 0;
-        if (msgSer->serialize(msgSer->handle,inMsg,&serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
-
-            pubsub_udp_msg_t *msg = calloc(1, sizeof(*msg));
-            msg->typeid = msgTypeId;
 
+        if (msgSer->serialize(msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen) == CELIX_SUCCESS) {
+            pubsub_udp_msg_header_t *msg_hdr = calloc(1, sizeof(*msg_hdr));
+            msg_hdr->type = msgTypeId;
 
-            if (msgSer->msgVersion != NULL){
+            if (msgSer->msgVersion != NULL) {
+                int major = 0, minor = 0;
                 version_getMajor(msgSer->msgVersion, &major);
                 version_getMinor(msgSer->msgVersion, &minor);
-                msg->major = (unsigned char)major;
-                msg->minor = (unsigned char)minor;
+                msg_hdr->major = (unsigned char) major;
+                msg_hdr->minor = (unsigned char) minor;
             }
 
-
-            msg->payload = (unsigned char*) serializedOutput;
-            msg->payloadSize = (unsigned int)serializedOutputLen;
-
+            pubsub_udp_msg_t *msg = calloc(1, sizeof(*msg));
+            msg->header = msg_hdr;
+            msg->payload = (char *) serializedOutput;
+            msg->payloadSize = (unsigned int) serializedOutputLen;
 
             if (psa_udpmc_sendMsg(entry, msg) == false) {
                 status = -1;
             }
             free(msg);
+            free(msg_hdr);
             free(serializedOutput);
         } else {
             printf("[PSA_UDPMC/TopicSender] Serialization of msg type id %d failed\n", msgTypeId);
-            status=-1;
+            status = -1;
         }
 
     } else {
         printf("[PSA_UDPMC/TopicSender] No msg serializer available for msg type id %d\n", msgTypeId);
-        status=-1;
+        status = -1;
     }
     return status;
 }
 
-static void delay_first_send_for_late_joiners(){
+static void delay_first_send_for_late_joiners() {
 
     static bool firstSend = true;
 
-    if(firstSend){
+    if (firstSend) {
         printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
         sleep(FIRST_SEND_DELAY_IN_SECONDS);
         firstSend = false;
@@ -309,8 +314,8 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_u
     bool ret = true;
 
     struct iovec msg_iovec[iovec_len];
-    msg_iovec[0].iov_base = msg;
-    msg_iovec[0].iov_len = sizeof(*msg);
+    msg_iovec[0].iov_base = msg->header;
+    msg_iovec[0].iov_len = sizeof(*msg->header);
     msg_iovec[1].iov_base = &msg->payloadSize;
     msg_iovec[1].iov_len = sizeof(msg->payloadSize);
     msg_iovec[2].iov_base = msg->payload;
@@ -318,7 +323,7 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_u
 
     delay_first_send_for_late_joiners();
 
-    if(largeUdp_sendmsg(entry->largeUdpHandle, entry->parent->sendSocket, msg_iovec, iovec_len, 0, &entry->parent->destAddr, sizeof(entry->parent->destAddr)) == -1) {
+    if (largeUdp_sendmsg(entry->largeUdpHandle, entry->parent->sendSocket, msg_iovec, iovec_len, 0, &entry->parent->destAddr, sizeof(entry->parent->destAddr)) == -1) {
         perror("send_pubsub_msg:sendSocket");
         ret = false;
     }
@@ -326,7 +331,7 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_u
     return ret;
 }
 
-static unsigned int rand_range(unsigned int min, unsigned int max){
+static unsigned int rand_range(unsigned int min, unsigned int max) {
     double scaled = ((double)random())/((double)RAND_MAX);
     return (unsigned int)((max-min+1)*scaled + min);
 }


Mime
View raw message