celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [40/60] [abbrv] [partial] celix git commit: CELIX-424: Cleans up the directory structure. Moves all libraries to the libs subdir and all bundles to the bundles subdir
Date Sun, 27 May 2018 18:52:48 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c
new file mode 100644
index 0000000..7455925
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.c
@@ -0,0 +1,372 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * large_udp.c
+ *
+ *  \date       Mar 1, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "large_udp.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <array_list.h>
+#include <pthread.h>
+
+#define MAX_UDP_MSG_SIZE 65535   /* 2^16 -1 */
+#define IP_HEADER_SIZE  20
+#define UDP_HEADER_SIZE 8
+//#define MTU_SIZE    1500
+#define MTU_SIZE    8000
+#define MAX_MSG_VECTOR_LEN 64
+
+//#define NO_IP_FRAGMENTATION
+
+struct largeUdp {
+	unsigned int maxNrLists;
+	array_list_pt udpPartLists;
+	pthread_mutex_t dbLock;
+};
+
+typedef struct udpPartList {
+	unsigned int msg_ident;
+	unsigned int msg_size;
+	unsigned int nrPartsRemaining;
+	char *data;
+} *udpPartList_pt;
+
+
+typedef struct msg_part_header {
+	unsigned int msg_ident;
+	unsigned int total_msg_size;
+	unsigned int part_msg_size;
+	unsigned int offset;
+} msg_part_header_t;
+
+#ifdef NO_IP_FRAGMENTATION
+#define MAX_PART_SIZE   (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
+#else
+#define MAX_PART_SIZE   (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) ))
+#endif
+
+typedef struct msg_part {
+	msg_part_header_t header;
+	char data[MAX_PART_SIZE];
+} msg_part_t;
+
+//
+// Create a handle
+//
+largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions)
+{
+	printf("## Creating large UDP\n");
+	largeUdp_pt handle = calloc(sizeof(*handle), 1);
+	if(handle != NULL) {
+		handle->maxNrLists = maxNrUdpReceptions;
+		if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) {
+			free(handle);
+			handle = NULL;
+		}
+		pthread_mutex_init(&handle->dbLock, 0);
+	}
+
+	return handle;
+}
+
+//
+// Destroys the handle
+//
+void largeUdp_destroy(largeUdp_pt handle)
+{
+	printf("### Destroying large UDP\n");
+	if(handle != NULL) {
+		pthread_mutex_lock(&handle->dbLock);
+		int nrUdpLists = arrayList_size(handle->udpPartLists);
+		int i;
+		for(i=0; i < nrUdpLists; i++) {
+			udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i);
+			if(udpPartList) {
+				if(udpPartList->data) {
+					free(udpPartList->data);
+					udpPartList->data = NULL;
+				}
+				free(udpPartList);
+			}
+		}
+		arrayList_destroy(handle->udpPartLists);
+		handle->udpPartLists = NULL;
+		pthread_mutex_unlock(&handle->dbLock);
+		pthread_mutex_destroy(&handle->dbLock);
+		free(handle);
+	}
+}
+
+//
+// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP.
+//
+int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
+{
+	int n;
+	int result = 0;
+	msg_part_header_t header;
+
+	int written = 0;
+	header.msg_ident = (unsigned int)random();
+	header.total_msg_size = 0;
+	for(n = 0; n < len ;n++) {
+		header.total_msg_size += largeMsg_iovec[n].iov_len;
+	}
+	int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1;
+
+	struct iovec msg_iovec[MAX_MSG_VECTOR_LEN];
+	struct msghdr msg;
+	msg.msg_name = dest_addr;
+	msg.msg_namelen = addrlen;
+	msg.msg_flags = 0;
+	msg.msg_iov = msg_iovec;
+	msg.msg_iovlen = 2; // header and payload;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+
+	msg.msg_iov[0].iov_base = &header;
+	msg.msg_iov[0].iov_len = sizeof(header);
+
+	for(n = 0; n < nr_buffers; n++) {
+
+		header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
+		header.offset = n * MAX_PART_SIZE;
+		int remainingOffset = header.offset;
+		int recvPart = 0;
+		// find the start of the part
+		while(remainingOffset > largeMsg_iovec[recvPart].iov_len) {
+			remainingOffset -= largeMsg_iovec[recvPart].iov_len;
+			recvPart++;
+		}
+		int remainingData = header.part_msg_size;
+		int sendPart = 1;
+		msg.msg_iovlen = 1;
+
+		// fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal.
+		while(remainingData > 0) {
+			int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData);
+			msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset;
+			msg.msg_iov[sendPart].iov_len = partLen;
+			remainingData -= partLen;
+			remainingOffset = 0;
+			sendPart++;
+			recvPart++;
+			msg.msg_iovlen++;
+		}
+		int tmp, tmptot;
+		for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) {
+			tmptot += msg.msg_iov[tmp].iov_len;
+		}
+
+		int w = sendmsg(fd, &msg, 0);
+		if(w == -1) {
+			perror("send()");
+			result =  -1;
+			break;
+		}
+		written += w;
+	}
+
+	return (result == 0 ? written : result);
+}
+
+//
+// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP.
+//
+int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen)
+{
+	int n;
+	int nr_buffers = (count / MAX_PART_SIZE) + 1;
+	int result = 0;
+	msg_part_header_t header;
+
+	int written = 0;
+	header.msg_ident = (unsigned int)random();
+	header.total_msg_size = count;
+	char *databuf = buf;
+
+	struct iovec msg_iovec[2];
+	struct msghdr msg;
+	msg.msg_name = dest_addr;
+	msg.msg_namelen = addrlen;
+	msg.msg_flags = 0;
+	msg.msg_iov = msg_iovec;
+	msg.msg_iovlen = 2; // header and payload;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+
+	msg.msg_iov[0].iov_base = &header;
+	msg.msg_iov[0].iov_len = sizeof(header);
+
+	for(n = 0; n < nr_buffers; n++) {
+
+		header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) >  MAX_PART_SIZE) ?  MAX_PART_SIZE  : (header.total_msg_size - n * MAX_PART_SIZE));
+		header.offset = n * MAX_PART_SIZE;
+		msg.msg_iov[1].iov_base = &databuf[header.offset];
+		msg.msg_iov[1].iov_len = header.part_msg_size;
+		int w = sendmsg(fd, &msg, 0);
+		if(w == -1) {
+			perror("send()");
+			result =  -1;
+			break;
+		}
+		written += w;
+		//usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost)
+	}
+
+	return (result == 0 ? written : result);
+}
+
+//
+// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
+// If the message is completely reassembled true is returned and the index and size have valid values
+//
+bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) {
+	msg_part_header_t header;
+	int result = false;
+	// Only read the header, we don't know yet where to store the payload
+	if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) {
+		perror("read()");
+		return false;
+	}
+
+	struct iovec msg_vec[2];
+	struct msghdr msg;
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_flags = 0;
+	msg.msg_iov = msg_vec;
+	msg.msg_iovlen = 2; // header and payload;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+
+	msg.msg_iov[0].iov_base = &header;
+	msg.msg_iov[0].iov_len = sizeof(header);
+
+	pthread_mutex_lock(&handle->dbLock);
+
+	int nrUdpLists = arrayList_size(handle->udpPartLists);
+	int i;
+	bool found = false;
+	for(i = 0; i < nrUdpLists; i++) {
+		udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i);
+		if(udpPartList->msg_ident == header.msg_ident) {
+			found = true;
+
+			//sanity check
+			if(udpPartList->msg_size != header.total_msg_size) {
+				// Corruption occurred. Remove the existing administration and build up a new one.
+				arrayList_remove(handle->udpPartLists, i);
+				free(udpPartList->data);
+				free(udpPartList);
+				found = false;
+				break;
+			}
+
+			msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+			msg.msg_iov[1].iov_len = header.part_msg_size;
+			if(recvmsg(fd, &msg, 0)<0){
+				found=true;
+				result=false;
+				break;
+			}
+
+			udpPartList->nrPartsRemaining--;
+			if(udpPartList->nrPartsRemaining == 0) {
+				*index = i;
+				*size = udpPartList->msg_size;
+				result = true;
+				break;
+			} else {
+				result = false; // not complete
+				break;
+			}
+		}
+	}
+
+	if(found == false) {
+		udpPartList_pt udpPartList = NULL;
+		if(nrUdpLists == handle->maxNrLists) {
+			// remove list at index 0
+			udpPartList = arrayList_remove(handle->udpPartLists, 0);
+			fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining );
+			free(udpPartList->data);
+			free(udpPartList);
+			nrUdpLists--;
+		}
+		udpPartList = calloc(sizeof(*udpPartList), 1);
+		udpPartList->msg_ident =  header.msg_ident;
+		udpPartList->msg_size =  header.total_msg_size;
+		udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE;
+		udpPartList->data = calloc(sizeof(char), header.total_msg_size);
+
+		msg.msg_iov[1].iov_base = &udpPartList->data[header.offset];
+		msg.msg_iov[1].iov_len = header.part_msg_size;
+		if(recvmsg(fd, &msg, 0)<0){
+			free(udpPartList->data);
+			free(udpPartList);
+			result=false;
+		}
+		else{
+			arrayList_add(handle->udpPartLists, udpPartList);
+
+			if(udpPartList->nrPartsRemaining == 0) {
+				*index = nrUdpLists;
+				*size = udpPartList->msg_size;
+				result = true;
+			} else {
+				result = false;
+			}
+		}
+
+	}
+
+	pthread_mutex_unlock(&handle->dbLock);
+
+	return result;
+}
+
+//
+// Read out the message which is indicated available by the largeUdp_dataAvailable function
+//
+int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size)
+{
+	int result = 0;
+	pthread_mutex_lock(&handle->dbLock);
+
+	udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index);
+	if(udpPartList) {
+		*buffer = udpPartList->data;
+		free(udpPartList);
+	} else {
+		result = -1;
+	}
+	pthread_mutex_unlock(&handle->dbLock);
+
+	return result;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h
new file mode 100644
index 0000000..a21e654
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/large_udp.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * large_udp.h
+ *
+ *  \date       Mar 1, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef _LARGE_UDP_H_
+#define _LARGE_UDP_H_
+#include <stdbool.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+typedef struct largeUdp  *largeUdp_pt;
+
+largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions);
+void largeUdp_destroy(largeUdp_pt handle);
+
+int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen);
+int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen);
+bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size);
+int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size);
+
+#endif /* _LARGE_UDP_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
new file mode 100644
index 0000000..cd4ee07
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -0,0 +1,141 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * psa_activator.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_registration.h"
+#include "service_tracker.h"
+
+#include "pubsub_admin_impl.h"
+
+struct activator {
+	pubsub_admin_pt admin;
+	pubsub_admin_service_pt adminService;
+	service_registration_pt registration;
+	service_tracker_pt serializerTracker;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator;
+
+	activator = calloc(1, sizeof(*activator));
+	if (!activator) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		*userData = activator;
+
+		status = pubsubAdmin_create(context, &(activator->admin));
+
+		if(status == CELIX_SUCCESS){
+			service_tracker_customizer_pt customizer = NULL;
+			status = serviceTrackerCustomizer_create(activator->admin,
+					NULL,
+					pubsubAdmin_serializerAdded,
+					NULL,
+					pubsubAdmin_serializerRemoved,
+					&customizer);
+			if(status == CELIX_SUCCESS){
+				status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+				if(status != CELIX_SUCCESS){
+					serviceTrackerCustomizer_destroy(customizer);
+					pubsubAdmin_destroy(activator->admin);
+				}
+			}
+			else{
+				pubsubAdmin_destroy(activator->admin);
+			}
+		}
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+	pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
+
+	if (!pubsubAdminSvc) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		pubsubAdminSvc->admin = activator->admin;
+
+		pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
+		pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
+
+		pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
+		pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
+
+		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
+		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
+
+		pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+
+		activator->adminService = pubsubAdminSvc;
+
+		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+
+		status += serviceTracker_open(activator->serializerTracker);
+
+	}
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	status += serviceTracker_close(activator->serializerTracker);
+	status += serviceRegistration_unregister(activator->registration);
+
+	activator->registration = NULL;
+
+	free(activator->adminService);
+	activator->adminService = NULL;
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceTracker_destroy(activator->serializerTracker);
+	pubsubAdmin_destroy(activator->admin);
+	activator->admin = NULL;
+
+	free(activator);
+
+	return status;
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
new file mode 100644
index 0000000..1e3cef0
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -0,0 +1,1160 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#ifndef ANDROID
+#include <ifaddrs.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <assert.h>
+
+#include "constants.h"
+#include "utils.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "bundle.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_helper.h"
+#include "log_service.h"
+#include "celix_threads.h"
+#include "service_factory.h"
+
+#include "pubsub_admin_impl.h"
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub_endpoint.h"
+#include "pubsub/subscriber.h"
+#include "pubsub_admin_match.h"
+
+static const char *DEFAULT_MC_IP = "224.100.1.1";
+static char *DEFAULT_MC_PREFIX = "224.100";
+
+static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*admin = calloc(1, sizeof(**admin));
+
+	if (!*admin) {
+		return CELIX_ENOMEM;
+	}
+
+	char *mc_ip = NULL;
+	char *if_ip = NULL;
+	int sendSocket = -1;
+
+	if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+		logHelper_start((*admin)->loghelper);
+	}
+	const char *mc_ip_prop = NULL;
+	bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
+	if(mc_ip_prop) {
+		mc_ip = strdup(mc_ip_prop);
+	}
+
+#ifndef ANDROID
+	if (mc_ip == NULL) {
+		const char *mc_prefix = NULL;
+		const char *interface = NULL;
+		int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
+		bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
+		if(mc_prefix == NULL) {
+			mc_prefix = DEFAULT_MC_PREFIX;
+		}
+
+		bundleContext_getProperty(context, PSA_ITF, &interface);
+		if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
+			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for interface %s", interface);
+		}
+
+		printf("IP Detected : %s\n", if_ip);
+		if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
+			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
+			b2 = 1;
+			b3 = 1;
+		}
+
+		asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
+
+		sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+		if(sendSocket == -1) {
+			perror("pubsubAdmin_create:socket");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+
+		if(status == CELIX_SUCCESS){
+			char loop = 1;
+			if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
+				perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
+				status = CELIX_SERVICE_EXCEPTION;
+			}
+		}
+
+		if(status == CELIX_SUCCESS){
+			struct in_addr multicast_interface;
+			inet_aton(if_ip, &multicast_interface);
+			if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
+				perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
+				status = CELIX_SERVICE_EXCEPTION;
+			}
+		}
+
+	}
+
+
+	if(status != CELIX_SUCCESS){
+		logHelper_stop((*admin)->loghelper);
+		logHelper_destroy(&((*admin)->loghelper));
+		if(sendSocket >=0){
+			close(sendSocket);
+		}
+		if(if_ip != NULL){
+			free(if_ip);
+		}
+		if(mc_ip != NULL){
+			free(mc_ip);
+		}
+		return status;
+	}
+	else{
+		(*admin)->sendSocket = sendSocket;
+	}
+
+#endif
+
+	(*admin)->bundle_context= context;
+	(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	(*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+	(*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
+	arrayList_create(&((*admin)->noSerializerSubscriptions));
+	arrayList_create(&((*admin)->noSerializerPublications));
+	arrayList_create(&((*admin)->serializerList));
+
+	celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+	celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+	celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+	celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+	celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+	celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
+	celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+	celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
+
+	celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+	celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+	celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
+	if (if_ip != NULL) {
+		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast communication", if_ip);
+		(*admin)->ifIpAddress = if_ip;
+	} else {
+		(*admin)->ifIpAddress = strdup("127.0.0.1");
+	}
+
+	if (mc_ip != NULL) {
+		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip);
+		(*admin)->mcIpAddress = mc_ip;
+	}
+	else {
+		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP);
+		(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
+	}
+
+	(*admin)->defaultScore = PSA_UDPMC_DEFAULT_SCORE;
+	(*admin)->qosSampleScore = PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE;
+	(*admin)->qosControlScore = PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE;
+
+	const char *defaultScoreStr = NULL;
+	const char *sampleScoreStr = NULL;
+	const char *controlScoreStr = NULL;
+	bundleContext_getProperty(context, PSA_UDPMC_DEFAULT_SCORE_KEY, &defaultScoreStr);
+	bundleContext_getProperty(context, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
+	bundleContext_getProperty(context, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
+
+	if (defaultScoreStr != NULL) {
+		(*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+	}
+	if (sampleScoreStr != NULL) {
+		(*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+	}
+	if (controlScoreStr != NULL) {
+		(*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+	}
+
+	(*admin)->verbose = PSA_UDPMC_DEFAULT_VERBOSE;
+	const char *verboseStr = NULL;
+	bundleContext_getProperty(context, PSA_UDPMC_VERBOSE_KEY, &verboseStr);
+	if (verboseStr != NULL) {
+		(*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+	}
+
+	return status;
+}
+
+
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+{
+	celix_status_t status = CELIX_SUCCESS;
+
+	free(admin->mcIpAddress);
+	free(admin->ifIpAddress);
+
+	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+	hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
+	while(hashMapIterator_hasNext(iter)){
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		free((char*)hashMapEntry_getKey(entry));
+		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+	}
+	hashMapIterator_destroy(iter);
+	hashMap_destroy(admin->pendingSubscriptions,false,false);
+	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	hashMap_destroy(admin->subscriptions,false,false);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	hashMap_destroy(admin->localPublications,true,false);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+	celixThreadMutex_lock(&admin->externalPublicationsLock);
+	iter = hashMapIterator_create(admin->externalPublications);
+	while(hashMapIterator_hasNext(iter)){
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		free((char*)hashMapEntry_getKey(entry));
+		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+	}
+	hashMapIterator_destroy(iter);
+	hashMap_destroy(admin->externalPublications,false,false);
+	celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+	celixThreadMutex_lock(&admin->serializerListLock);
+	arrayList_destroy(admin->serializerList);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+
+	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+	arrayList_destroy(admin->noSerializerSubscriptions);
+	arrayList_destroy(admin->noSerializerPublications);
+	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+	celixThreadMutex_lock(&admin->usedSerializersLock);
+
+	iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+	while(hashMapIterator_hasNext(iter)){
+		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+	}
+	hashMapIterator_destroy(iter);
+	hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+	iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+	while(hashMapIterator_hasNext(iter)){
+		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+	}
+	hashMapIterator_destroy(iter);
+	hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+	celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+	celixThreadMutex_destroy(&admin->usedSerializersLock);
+	celixThreadMutex_destroy(&admin->serializerListLock);
+
+	celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
+	celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+
+	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+	celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+
+	celixThreadMutex_destroy(&admin->subscriptionsLock);
+	celixThreadMutex_destroy(&admin->localPublicationsLock);
+	celixThreadMutex_destroy(&admin->externalPublicationsLock);
+
+	logHelper_stop(admin->loghelper);
+
+	logHelper_destroy(&admin->loghelper);
+
+	free(admin);
+
+	return status;
+}
+
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+
+	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+
+	if(any_sub==NULL){
+
+		int i;
+		pubsub_serializer_service_t *best_serializer = NULL;
+		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, NULL)) == CELIX_SUCCESS){
+			status = pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
+		}
+		else{
+			if (admin->verbose) {
+				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+			}
+
+			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+			arrayList_add(admin->noSerializerSubscriptions,subEP);
+			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+		}
+
+		if (status == CELIX_SUCCESS){
+
+			/* Connect all internal publishers */
+			celixThreadMutex_lock(&admin->localPublicationsLock);
+			hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
+			while(hashMapIterator_hasNext(lp_iter)){
+				service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
+				topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
+				array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+				if(topic_publishers!=NULL){
+					for(i=0;i<arrayList_size(topic_publishers);i++){
+						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+						}
+					}
+					arrayList_destroy(topic_publishers);
+				}
+			}
+			hashMapIterator_destroy(lp_iter);
+			celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+			/* Connect also all external publishers */
+			celixThreadMutex_lock(&admin->externalPublicationsLock);
+			hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
+			while(hashMapIterator_hasNext(extp_iter)){
+				array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
+				if(ext_pub_list!=NULL){
+					for(i=0;i<arrayList_size(ext_pub_list);i++){
+						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+						}
+					}
+				}
+			}
+			hashMapIterator_destroy(extp_iter);
+			celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+
+			pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
+
+			status += pubsub_topicSubscriptionStart(any_sub);
+
+		}
+
+		if (status == CELIX_SUCCESS){
+			hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+			connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
+		}
+
+	}
+
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+}
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
+		return pubsubAdmin_addAnySubscription(admin,subEP);
+	}
+
+	/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
+	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	celixThreadMutex_lock(&admin->externalPublicationsLock);
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+
+	if (factory==NULL && ext_pub_list==NULL) { //No (local or external) publishers yet for this topic
+		pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
+	} else {
+		int i;
+		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
+
+		if (subscription == NULL) {
+			pubsub_serializer_service_t *best_serializer = NULL;
+            const char *serType = NULL;
+			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
+			} else {
+				if (admin->verbose) {
+					printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+						   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+				}
+
+				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+				arrayList_add(admin->noSerializerSubscriptions,subEP);
+				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+			}
+
+			if (status==CELIX_SUCCESS){
+
+				/* Try to connect internal publishers */
+				if(factory!=NULL){
+					topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
+					array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+					if(topic_publishers!=NULL){
+						for(i=0;i<arrayList_size(topic_publishers);i++){
+							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+							if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+								status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+							}
+						}
+						arrayList_destroy(topic_publishers);
+					}
+
+				}
+
+				/* Look also for external publishers */
+				if(ext_pub_list!=NULL){
+					for(i=0;i<arrayList_size(ext_pub_list);i++){
+						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+							status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+						}
+					}
+				}
+
+				pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+
+				status += pubsub_topicSubscriptionStart(subscription);
+
+			}
+
+			if(status==CELIX_SUCCESS){
+
+				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+
+				connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
+			}
+		}
+
+		if (status == CELIX_SUCCESS){
+			pubsub_topicIncreaseNrSubscribers(subscription);
+		}
+	}
+
+	free(scope_topic);
+	celixThreadMutex_unlock(&admin->externalPublicationsLock);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+	return status;
+
+}
+
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+	if(sub!=NULL){
+		pubsub_topicDecreaseNrSubscribers(sub);
+		if(pubsub_topicGetNrSubscribers(sub) == 0) {
+			status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
+		}
+	}
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	if(sub==NULL){
+		/* Maybe the endpoint was pending */
+		celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+		if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
+			status = CELIX_ILLEGAL_STATE;
+		}
+		celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+	}
+
+	free(scope_topic);
+
+
+
+	return status;
+
+}
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+    const char* fwUUID = NULL;
+    bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+    if(fwUUID==NULL){
+        printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+
+    const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
+
+    if (isOwn) {
+        //should be null, willl be set in this call
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+    }
+
+    if (isOwn) {
+        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+    }
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+	if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
+
+		celixThreadMutex_lock(&admin->localPublicationsLock);
+
+		service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
+
+		if (factory == NULL) {
+			topic_publication_pt pub = NULL;
+			pubsub_serializer_service_t *best_serializer = NULL;
+			const char* serType = NULL;
+			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+				status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub);
+                if (isOwn) {
+                    properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                }
+			} else {
+				printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+				arrayList_add(admin->noSerializerPublications,pubEP);
+				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+			}
+
+			if (status == CELIX_SUCCESS) {
+				status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
+				if (status == CELIX_SUCCESS && factory != NULL) {
+					hashMap_put(admin->localPublications, strdup(scope_topic), factory);
+					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
+				}
+			} else {
+				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+					   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
+			}
+		} else {
+			//just add the new EP to the list
+			topic_publication_pt pub = (topic_publication_pt) factory->handle;
+			pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+		}
+
+		celixThreadMutex_unlock(&admin->localPublicationsLock);
+	} else {
+
+		celixThreadMutex_lock(&admin->externalPublicationsLock);
+		array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
+		if (ext_pub_list == NULL) {
+			arrayList_create(&ext_pub_list);
+			hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
+		}
+
+		arrayList_add(ext_pub_list, pubEP);
+
+		celixThreadMutex_unlock(&admin->externalPublicationsLock);
+	}
+
+	/* Re-evaluate the pending subscriptions */
+	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+
+	hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+	if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
+		char* topic = (char*) hashMapEntry_getKey(pendingSub);
+		array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
+		int i;
+		for (i = 0; i < arrayList_size(pendingSubList); i++) {
+			pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
+			pubsubAdmin_addSubscription(admin, subEP);
+		}
+		hashMap_remove(admin->pendingSubscriptions, scope_topic);
+		arrayList_clear(pendingSubList);
+		arrayList_destroy(pendingSubList);
+		free(topic);
+	}
+
+	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+	/* Connect the new publisher to the subscription for his topic, if there is any */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+
+	topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
+	if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
+		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	}
+
+	/* And check also for ANY subscription */
+	topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+	if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
+		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	}
+
+	free(scope_topic);
+
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+               isOwn);
+    }
+
+	return status;
+
+}
+
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+	celix_status_t status = CELIX_SUCCESS;
+	int count = 0;
+
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+	const char* fwUUID = NULL;
+
+	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+	if(fwUUID==NULL){
+		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
+		return CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+	if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+
+		celixThreadMutex_lock(&admin->localPublicationsLock);
+		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+		if(factory!=NULL){
+			topic_publication_pt pub = (topic_publication_pt)factory->handle;
+			pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
+		}
+		celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+		if(factory==NULL){
+			/* Maybe the endpoint was pending */
+			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+			if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
+				status = CELIX_ILLEGAL_STATE;
+			}
+			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+		}
+
+	}
+	else{
+
+		celixThreadMutex_lock(&admin->externalPublicationsLock);
+		array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+		if(ext_pub_list!=NULL){
+			int i;
+			bool found = false;
+			for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
+				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+				found = pubsubEndpoint_equals(pubEP,p);
+				if (found){
+					arrayList_remove(ext_pub_list,i);
+				}
+			}
+			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
+			for(i=0; i<arrayList_size(ext_pub_list);i++) {
+				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+				if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) {
+					count++;
+				}
+			}
+
+			if(arrayList_size(ext_pub_list)==0){
+				hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
+				char* topic = (char*)hashMapEntry_getKey(entry);
+				array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
+				hashMap_remove(admin->externalPublications,topic);
+				arrayList_destroy(list);
+				free(topic);
+			}
+		}
+
+		celixThreadMutex_unlock(&admin->externalPublicationsLock);
+	}
+
+	/* Check if this publisher was connected to one of our subscribers*/
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+
+	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+	if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	}
+
+	/* And check also for ANY subscription */
+	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+	if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+	}
+
+	free(scope_topic);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){
+	celix_status_t status = CELIX_SUCCESS;
+
+	printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic);
+
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
+	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
+	if(pubsvc_entry!=NULL){
+		char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
+		service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
+		topic_publication_pt pub = (topic_publication_pt)factory->handle;
+		status += pubsub_topicPublicationStop(pub);
+		disconnectTopicPubSubFromSerializer(admin, pub, true);
+		status += pubsub_topicPublicationDestroy(pub);
+		hashMap_remove(admin->localPublications,scope_topic);
+		free(key);
+		free(factory);
+	}
+	free(scope_topic);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+	return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){
+	celix_status_t status = CELIX_SUCCESS;
+
+	printf("PSA_UDP_MC: Closing all subscriptions\n");
+
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
+	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
+	if(sub_entry!=NULL){
+		char* topic = (char*)hashMapEntry_getKey(sub_entry);
+
+		topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
+		status += pubsub_topicSubscriptionStop(ts);
+		disconnectTopicPubSubFromSerializer(admin, ts, false);
+		status += pubsub_topicSubscriptionDestroy(ts);
+		hashMap_remove(admin->subscriptions,topic);
+		free(topic);
+
+	}
+	free(scope_topic);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+
+}
+
+
+#ifndef ANDROID
+static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) {
+	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+	struct ifaddrs *ifaddr, *ifa;
+	char host[NI_MAXHOST];
+
+	if (getifaddrs(&ifaddr) != -1)
+	{
+		for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+		{
+			if (ifa->ifa_addr == NULL)
+				continue;
+
+			if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+				if (interface == NULL) {
+					*ip = strdup(host);
+					status = CELIX_SUCCESS;
+				}
+				else if (strcmp(ifa->ifa_name, interface) == 0) {
+					*ip = strdup(host);
+					status = CELIX_SUCCESS;
+				}
+			}
+		}
+
+		freeifaddrs(ifaddr);
+	}
+
+	return status;
+}
+#endif
+
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
+	if(pendingListPerTopic==NULL){
+		arrayList_create(&pendingListPerTopic);
+		hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
+	}
+	arrayList_add(pendingListPerTopic,subEP);
+	free(scope_topic);
+
+	return status;
+}
+
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
+	/* Assumption: serializers are all available at startup.
+	 * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
+
+	celix_status_t status = CELIX_SUCCESS;
+	int i=0;
+
+	const char *serType = NULL;
+	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+	if(serType == NULL){
+		printf("PSA_UDPMC: Serializer serviceReference %p has no %s property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+	celixThreadMutex_lock(&admin->serializerListLock);
+	arrayList_add(admin->serializerList, reference);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+
+	/* Now let's re-evaluate the pending */
+	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+		pubsub_serializer_service_t *best_serializer = NULL;
+		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
+		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+			pubsubAdmin_addSubscription(admin, ep);
+		}
+	}
+
+	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+		pubsub_serializer_service_t *best_serializer = NULL;
+		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
+		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+			pubsubAdmin_addPublication(admin, ep);
+		}
+	}
+
+	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+	if (admin->verbose) {
+		printf("PSA_UDP_MC: %s serializer added\n", serType);
+	}
+
+	return status;
+}
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
+
+	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+	int i=0, j=0;
+	const char *serType = NULL;
+
+	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+	if(serType == NULL){
+		printf("Serializer serviceReference %p has no %s property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+	celixThreadMutex_lock(&admin->serializerListLock);
+	/* Remove the serializer from the list */
+	arrayList_removeElement(admin->serializerList, reference);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+
+	celixThreadMutex_lock(&admin->usedSerializersLock);
+	array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+	array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+	celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+	/* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
+	if(topicPubList!=NULL){
+		for(i=0;i<arrayList_size(topicPubList);i++){
+			topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
+			/* Stop the topic publication */
+			pubsub_topicPublicationStop(topicPub);
+			/* Get the endpoints that are going to be orphan */
+			array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
+			for(j=0;j<arrayList_size(pubList);j++){
+				pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
+				/* Remove the publication */
+				pubsubAdmin_removePublication(admin, pubEP);
+				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
+				if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
+					properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+				}
+				/* Add the orphan endpoint to the noSerializer pending list */
+				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+				arrayList_add(admin->noSerializerPublications,pubEP);
+				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+			}
+			arrayList_destroy(pubList);
+
+			/* Cleanup also the localPublications hashmap*/
+			celixThreadMutex_lock(&admin->localPublicationsLock);
+			hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
+			char *key = NULL;
+			service_factory_pt factory = NULL;
+			while(hashMapIterator_hasNext(iter)){
+				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+				factory = (service_factory_pt)hashMapEntry_getValue(entry);
+				topic_publication_pt pub = (topic_publication_pt)factory->handle;
+				if(pub==topicPub){
+					key = (char*)hashMapEntry_getKey(entry);
+					break;
+				}
+			}
+			hashMapIterator_destroy(iter);
+			if(key!=NULL){
+				hashMap_remove(admin->localPublications, key);
+				free(factory);
+				free(key);
+			}
+			celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+			/* Finally destroy the topicPublication */
+			pubsub_topicPublicationDestroy(topicPub);
+		}
+		arrayList_destroy(topicPubList);
+	}
+
+	/* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
+	if(topicSubList!=NULL){
+		for(i=0;i<arrayList_size(topicSubList);i++){
+			topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
+			/* Stop the topic subscription */
+			pubsub_topicSubscriptionStop(topicSub);
+			/* Get the endpoints that are going to be orphan */
+			array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
+			for(j=0;j<arrayList_size(subList);j++){
+				pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
+				/* Remove the subscription */
+				pubsubAdmin_removeSubscription(admin, subEP);
+				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
+				if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
+					properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+				}
+				/* Add the orphan endpoint to the noSerializer pending list */
+				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+				arrayList_add(admin->noSerializerSubscriptions,subEP);
+				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+			}
+
+			/* Cleanup also the subscriptions hashmap*/
+			celixThreadMutex_lock(&admin->subscriptionsLock);
+			hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
+			char *key = NULL;
+			while(hashMapIterator_hasNext(iter)){
+				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+				topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
+				if(sub==topicSub){
+					key = (char*)hashMapEntry_getKey(entry);
+					break;
+				}
+			}
+			hashMapIterator_destroy(iter);
+			if(key!=NULL){
+				hashMap_remove(admin->subscriptions, key);
+				free(key);
+			}
+			celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+			/* Finally destroy the topicSubscription */
+			pubsub_topicSubscriptionDestroy(topicSub);
+		}
+		arrayList_destroy(topicSubList);
+	}
+
+	if (admin->verbose) {
+		printf("PSA_UDP_MC: %s serializer removed\n", serType);
+	}
+
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
+	celix_status_t status = CELIX_SUCCESS;
+
+    const char *fwUuid = NULL;
+    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+    if (fwUuid == NULL) {
+        return CELIX_ILLEGAL_STATE;
+    }
+
+	celixThreadMutex_lock(&admin->serializerListLock);
+	status = pubsub_admin_match(endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList,score);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+
+	return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType){
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_serializer_service_t *serSvc = NULL;
+	service_reference_pt svcRef = NULL;
+
+	celixThreadMutex_lock(&admin->serializerListLock);
+	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
+	celixThreadMutex_unlock(&admin->serializerListLock);
+
+	if (svcRef != NULL) {
+		bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
+		bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
+        if (serType != NULL) {
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+        }
+	}
+
+	*out = serSvc;
+
+	return status;
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
+
+	celixThreadMutex_lock(&admin->usedSerializersLock);
+
+	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+	array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+	if(list==NULL){
+		arrayList_create(&list);
+		hashMap_put(map,serializer,list);
+	}
+	arrayList_add(list,topicPubSub);
+
+	celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
+
+	celixThreadMutex_lock(&admin->usedSerializersLock);
+
+	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+	hash_map_iterator_pt iter = hashMapIterator_create(map);
+	while(hashMapIterator_hasNext(iter)){
+		array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
+		if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+			break;
+		}
+	}
+	hashMapIterator_destroy(iter);
+
+	celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
new file mode 100644
index 0000000..3529a8f
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -0,0 +1,97 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.h
+ *
+ *  \date       Dec 5, 2013
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_
+#define PUBSUB_ADMIN_UDP_MC_IMPL_H_
+
+#include "pubsub_psa_udpmc_constants.h"
+#include "pubsub_admin.h"
+#include "log_helper.h"
+
+struct pubsub_admin {
+
+	bundle_context_pt bundle_context;
+	log_helper_pt loghelper;
+
+	/* List of the available serializers */
+	celix_thread_mutex_t serializerListLock; // List<serializers>
+	array_list_pt serializerList;
+
+	celix_thread_mutex_t localPublicationsLock;
+	hash_map_pt localPublications;//<topic(string),service_factory_pt>
+
+	celix_thread_mutex_t externalPublicationsLock;
+	hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
+
+	celix_thread_mutex_t subscriptionsLock;
+	hash_map_pt subscriptions; //<topic(string),topic_subscription>
+
+	celix_thread_mutex_t pendingSubscriptionsLock;
+	celix_thread_mutexattr_t pendingSubscriptionsAttr;
+	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
+
+	/* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
+	celix_thread_mutex_t noSerializerPendingsLock;
+	celix_thread_mutexattr_t noSerializerPendingsAttr;
+	array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+	array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+	celix_thread_mutex_t usedSerializersLock;
+	hash_map_pt topicSubscriptionsPerSerializer; // <serializer,List<topicSubscription>>
+	hash_map_pt topicPublicationsPerSerializer; // <serializer,List<topicPublications>>
+
+	char* ifIpAddress; // The local interface which is used for multicast communication
+	char* mcIpAddress; // The multicast IP address
+
+	int sendSocket;
+
+
+	double qosSampleScore;
+	double qosControlScore;
+	double defaultScore;
+
+	bool verbose;
+};
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic);
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+
+
+#endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
new file mode 100644
index 0000000..2a02da8
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_PSA_UDPMC_CONSTANTS_H_
+#define PUBSUB_PSA_UDPMC_CONSTANTS_H_
+
+
+#define PSA_UDPMC_PUBSUB_ADMIN_TYPE	            "udp_mc"
+
+#define PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE 		70
+#define PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE 	30
+#define PSA_UDPMC_DEFAULT_SCORE 				50
+
+#define PSA_UDPMC_QOS_SAMPLE_SCORE_KEY 			"PSA_UDPMC_QOS_SAMPLE_SCORE"
+#define PSA_UDPMC_QOS_CONTROL_SCORE_KEY 		"PSA_UDPMC_QOS_CONTROL_SCORE"
+#define PSA_UDPMC_DEFAULT_SCORE_KEY 			"PSA_UDPMC_DEFAULT_SCORE"
+
+#define PSA_UDPMC_DEFAULT_VERBOSE 				false
+
+#define PSA_UDPMC_VERBOSE_KEY    			    "PSA_UDPMC_VERBOSE"
+
+
+#endif /* PUBSUB_PSA_UDPMC_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
new file mode 100644
index 0000000..7e9bdbb
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -0,0 +1,453 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_registration.h"
+#include "utils.h"
+#include "service_factory.h"
+#include "version.h"
+
+#include "topic_publication.h"
+#include "pubsub_common.h"
+#include "pubsub/publisher.h"
+#include "large_udp.h"
+
+#include "pubsub_serializer.h"
+#include "pubsub_psa_udpmc_constants.h"
+
+#define EP_ADDRESS_LEN		32
+
+#define FIRST_SEND_DELAY	2
+
+struct topic_publication {
+	int sendSocket;
+	char* endpoint;
+	service_registration_pt svcFactoryReg;
+	array_list_pt pub_ep_list; //List<pubsub_endpoint>
+	hash_map_pt boundServices; //<bundle_pt,bound_service>
+	celix_thread_mutex_t tp_lock;
+	struct {
+		const char* type;
+		pubsub_serializer_service_t* svc;
+	} serializer;
+	struct sockaddr_in destAddr;
+};
+
+typedef struct publish_bundle_bound_service {
+	topic_publication_pt parent;
+	pubsub_publisher_t service;
+	bundle_pt bundle;
+	char *scope;
+	char *topic;
+	hash_map_pt msgTypes;
+	unsigned short getCount;
+	celix_thread_mutex_t mp_lock;
+	largeUdp_pt largeUdpHandle;
+}* publish_bundle_bound_service_pt;
+
+
+typedef struct pubsub_msg{
+	pubsub_msg_header_pt header;
+	char* payload;
+	unsigned int payloadSize;
+} pubsub_msg_t;
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
+
+
+static void delay_first_send_for_late_joiners(void);
+
+
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out){
+
+	char* ep = malloc(EP_ADDRESS_LEN);
+	memset(ep,0,EP_ADDRESS_LEN);
+
+	long serviceId =strtol(properties_getWithDefault(pubEP->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, "0"), NULL, 10);
+
+	unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, UDP_MAX_PORT);
+	snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
+
+
+	topic_publication_pt pub = calloc(1,sizeof(*pub));
+
+	arrayList_create(&(pub->pub_ep_list));
+	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
+	celixThreadMutex_create(&(pub->tp_lock),NULL);
+
+	pub->endpoint = ep;
+	pub->sendSocket = sendSocket;
+	pub->destAddr.sin_family = AF_INET;
+	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
+	pub->destAddr.sin_port = htons(port);
+
+	pub->serializer.type = best_serializer_type;
+	pub->serializer.svc = best_serializer;
+
+	pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+
+	*out = pub;
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+
+	free(pub->endpoint);
+	arrayList_destroy(pub->pub_ep_list);
+
+	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
+	while(hashMapIterator_hasNext(iter)){
+		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+		pubsub_destroyPublishBundleBoundService(bound);
+	}
+	hashMapIterator_destroy(iter);
+	hashMap_destroy(pub->boundServices,false,false);
+
+	pub->svcFactoryReg = NULL;
+	pub->serializer.svc= NULL;
+	pub->serializer.type= NULL;
+
+	if(close(pub->sendSocket) != 0){
+		status = CELIX_FILE_IO_EXCEPTION;
+	}
+
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	celixThreadMutex_destroy(&(pub->tp_lock));
+
+	free(pub);
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
+	celix_status_t status = CELIX_SUCCESS;
+
+	/* Let's register the new service */
+
+	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+
+	if(pubEP!=NULL){
+		service_factory_pt factory = calloc(1, sizeof(*factory));
+		factory->handle = pub;
+		factory->getService = pubsub_topicPublicationGetService;
+		factory->ungetService = pubsub_topicPublicationUngetService;
+
+		properties_pt props = properties_create();
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
+
+
+		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
+
+		if(status != CELIX_SUCCESS){
+			properties_destroy(props);
+			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %s).\n",
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID));
+		}
+		else{
+			*svcFactory = factory;
+		}
+	}
+	else{
+		printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
+		status = CELIX_SERVICE_EXCEPTION;
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
+	return serviceRegistration_unregister(pub->svcFactoryReg);
+}
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+	pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
+	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
+	arrayList_add(pub->pub_ep_list,ep);
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep) {
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+	arrayList_removeElement(pub->pub_ep_list,ep);
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return CELIX_SUCCESS;
+}
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
+	array_list_pt list = NULL;
+	celixThreadMutex_lock(&(pub->tp_lock));
+	list = arrayList_clone(pub->pub_ep_list);
+	celixThreadMutex_unlock(&(pub->tp_lock));
+	return list;
+}
+
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
+	celix_status_t  status = CELIX_SUCCESS;
+
+	topic_publication_pt publish = (topic_publication_pt)handle;
+
+	celixThreadMutex_lock(&(publish->tp_lock));
+
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	if(bound==NULL){
+		bound = pubsub_createPublishBundleBoundService(publish,bundle);
+		if(bound!=NULL){
+			hashMap_put(publish->boundServices,bundle,bound);
+		}
+	}
+	else{
+		bound->getCount++;
+	}
+
+	if (bound != NULL) {
+		*service = &bound->service;
+	}
+
+	celixThreadMutex_unlock(&(publish->tp_lock));
+
+	return status;
+}
+
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
+
+	topic_publication_pt publish = (topic_publication_pt)handle;
+
+	celixThreadMutex_lock(&(publish->tp_lock));
+
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	if(bound!=NULL){
+
+		bound->getCount--;
+		if(bound->getCount==0){
+			pubsub_destroyPublishBundleBoundService(bound);
+			hashMap_remove(publish->boundServices,bundle);
+		}
+
+	}
+	else{
+		long bundleId = -1;
+		bundle_getBundleId(bundle,&bundleId);
+		printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
+	}
+
+	/* service should be never used for unget, so let's set the pointer to NULL */
+	*service = NULL;
+
+	celixThreadMutex_unlock(&(publish->tp_lock));
+
+	return CELIX_SUCCESS;
+}
+
+static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
+	const int iovec_len = 3; // header + size + payload
+	bool ret = true;
+
+	struct iovec msg_iovec[iovec_len];
+	msg_iovec[0].iov_base = msg->header;
+	msg_iovec[0].iov_len = sizeof(*msg->header);
+	msg_iovec[1].iov_base = &msg->payloadSize;
+	msg_iovec[1].iov_len = sizeof(msg->payloadSize);
+	msg_iovec[2].iov_base = msg->payload;
+	msg_iovec[2].iov_len = msg->payloadSize;
+
+	delay_first_send_for_late_joiners();
+
+	if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) {
+		perror("send_pubsub_msg:sendSocket");
+		ret = false;
+	}
+
+	if(releaseCallback) {
+		releaseCallback->release(msg->payload, bound);
+	}
+	return ret;
+
+}
+
+
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+	int status = 0;
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+
+	celixThreadMutex_lock(&(bound->parent->tp_lock));
+	celixThreadMutex_lock(&(bound->mp_lock));
+
+	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(intptr_t)msgTypeId);
+
+	if (msgSer != NULL) {
+		int major=0, minor=0;
+
+		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
+		msg_hdr->type = msgTypeId;
+
+
+		if (msgSer->msgVersion != NULL){
+			version_getMajor(msgSer->msgVersion, &major);
+			version_getMinor(msgSer->msgVersion, &minor);
+			msg_hdr->major = major;
+			msg_hdr->minor = minor;
+		}
+
+		void* serializedOutput = NULL;
+		size_t serializedOutputLen = 0;
+		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+		pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
+		msg->header = msg_hdr;
+		msg->payload = (char*)serializedOutput;
+		msg->payloadSize = serializedOutputLen;
+
+
+		if(send_pubsub_msg(bound, msg,true, NULL) == false) {
+			status = -1;
+		}
+		free(msg_hdr);
+		free(msg);
+		free(serializedOutput);
+
+
+	} else {
+		printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId);
+		status=-1;
+	}
+
+	celixThreadMutex_unlock(&(bound->mp_lock));
+	celixThreadMutex_unlock(&(bound->parent->tp_lock));
+
+	return status;
+}
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
+	*msgTypeId = utils_stringHash(msgType);
+	return 0;
+}
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+
+	double scaled = (double)(((double)random())/((double)RAND_MAX));
+	return (max-min+1)*scaled + min;
+
+}
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+
+	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+
+	if (bound != NULL) {
+
+		bound->parent = tp;
+		bound->bundle = bundle;
+		bound->getCount = 1;
+		celixThreadMutex_create(&bound->mp_lock,NULL);
+
+		if (tp->serializer.svc != NULL){
+			tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
+		}
+
+		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
+		bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		bound->largeUdpHandle = largeUdp_create(1);
+
+		bound->service.handle = bound;
+		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+		bound->service.send = pubsub_topicPublicationSend;
+		bound->service.sendMultipart = NULL;  //Multipart not supported for UDP
+
+	}
+
+	return bound;
+}
+
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+
+	celixThreadMutex_lock(&boundSvc->mp_lock);
+
+	if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
+		boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
+	}
+
+	if(boundSvc->scope!=NULL){
+		free(boundSvc->scope);
+	}
+
+	if(boundSvc->topic!=NULL){
+		free(boundSvc->topic);
+	}
+
+	largeUdp_destroy(boundSvc->largeUdpHandle);
+
+	celixThreadMutex_unlock(&boundSvc->mp_lock);
+	celixThreadMutex_destroy(&boundSvc->mp_lock);
+
+	free(boundSvc);
+
+}
+
+static void delay_first_send_for_late_joiners(){
+
+	static bool firstSend = true;
+
+	if(firstSend){
+		printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+		sleep(FIRST_SEND_DELAY);
+		firstSend = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
new file mode 100644
index 0000000..e0a5698
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
@@ -0,0 +1,57 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "pubsub/publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+#define UDP_BASE_PORT	49152
+#define UDP_MAX_PORT	65000
+
+typedef struct pubsub_udp_msg {
+    struct pubsub_msg_header header;
+    unsigned int payloadSize;
+    char payload[];
+} pubsub_udp_msg_t;
+
+typedef struct topic_publication *topic_publication_pt;
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */


Mime
View raw message