celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [39/51] [partial] celix git commit: CELIX-424: Cleans up the directory structure. Moves all libraries to the libs subdir and all bundles to the bundles subdir
Date Sun, 27 May 2018 18:37:22 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
new file mode 100644
index 0000000..6d0768b
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
@@ -0,0 +1,635 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.c
+ *
+ *  \date       Oct 2, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "utils.h"
+#include "celix_errno.h"
+#include "constants.h"
+#include "version.h"
+
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub/subscriber.h"
+#include "pubsub/publisher.h"
+#include "large_udp.h"
+
+#include "pubsub_serializer.h"
+
+#define MAX_EPOLL_EVENTS        10
+#define RECV_THREAD_TIMEOUT     5
+#define UDP_BUFFER_SIZE         65535
+#define MAX_UDP_SESSIONS        16
+
+struct topic_subscription{
+	char* ifIpAddress;
+	service_tracker_pt tracker;
+	array_list_pt sub_ep_list;
+	celix_thread_t recv_thread;
+	bool running;
+	celix_thread_mutex_t ts_lock;
+	bundle_context_pt context;
+
+	pubsub_serializer_service_t *serializer;
+
+	int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
+	hash_map_pt servicesMap; // key = service, value = msg types map
+	hash_map_pt socketMap; // key = URL, value = listen-socket
+	celix_thread_mutex_t socketMap_lock;
+
+	celix_thread_mutex_t pendingConnections_lock;
+	array_list_pt pendingConnections;
+
+	array_list_pt pendingDisconnections;
+	celix_thread_mutex_t pendingDisconnections_lock;
+
+	//array_list_pt rawServices;
+	unsigned int nrSubscribers;
+	largeUdp_pt largeUdpHandle;
+};
+
+typedef struct msg_map_entry{
+	bool retain;
+	void* msgInst;
+}* msg_map_entry_pt;
+
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service);
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service);
+static void* udp_recv_thread_func(void* arg);
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static void sigusr1_sighandler(int signo);
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
+
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
+	celix_status_t status = CELIX_SUCCESS;
+
+	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
+	ts->context = bundle_context;
+	ts->ifIpAddress = strdup(ifIp);
+#if defined(__APPLE__) && defined(__MACH__)
+	//TODO: Use kqueue for OSX
+#else
+	ts->topicEpollFd = epoll_create1(0);
+#endif
+	if(ts->topicEpollFd == -1) {
+		status += CELIX_SERVICE_EXCEPTION;
+	}
+
+	ts->running = false;
+	ts->nrSubscribers = 0;
+	ts->serializer = best_serializer;
+
+	celixThreadMutex_create(&ts->ts_lock,NULL);
+	arrayList_create(&ts->sub_ep_list);
+	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->socketMap =  hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+	arrayList_create(&ts->pendingConnections);
+	arrayList_create(&ts->pendingDisconnections);
+	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+	celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+	celixThreadMutex_create(&ts->socketMap_lock, NULL);
+
+	ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
+
+	char filter[128];
+	memset(filter,0,128);
+	if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
+		// default scope, means that subscriber has not defined a scope property
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+	} else {
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic,
+				PUBSUB_SUBSCRIBER_SCOPE,scope);
+	}
+
+	service_tracker_customizer_pt customizer = NULL;
+	status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
+	status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker);
+
+	struct sigaction actions;
+	memset(&actions, 0, sizeof(actions));
+	sigemptyset(&actions.sa_mask);
+	actions.sa_flags = 0;
+	actions.sa_handler = sigusr1_sighandler;
+
+	sigaction(SIGUSR1,&actions,NULL);
+
+	if (status == CELIX_SUCCESS) {
+		*out=ts;
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	ts->running = false;
+	free(ts->ifIpAddress);
+	serviceTracker_destroy(ts->tracker);
+	arrayList_clear(ts->sub_ep_list);
+	arrayList_destroy(ts->sub_ep_list);
+	hashMap_destroy(ts->servicesMap,false,false);
+
+	celixThreadMutex_lock(&ts->socketMap_lock);
+	hashMap_destroy(ts->socketMap,true,true);
+	celixThreadMutex_unlock(&ts->socketMap_lock);
+	celixThreadMutex_destroy(&ts->socketMap_lock);
+
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_destroy(ts->pendingConnections);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_destroy(ts->pendingDisconnections);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
+
+	largeUdp_destroy(ts->largeUdpHandle);
+#if defined(__APPLE__) && defined(__MACH__)
+	//TODO: Use kqueue for OSX
+#else
+	close(ts->topicEpollFd);
+#endif
+
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	celixThreadMutex_destroy(&ts->ts_lock);
+
+	free(ts);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
+	celix_status_t status = CELIX_SUCCESS;
+
+	status = serviceTracker_open(ts->tracker);
+
+	ts->running = true;
+
+	if(status==CELIX_SUCCESS){
+		status=celixThread_create(&ts->recv_thread,NULL,udp_recv_thread_func,ts);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
+	celix_status_t status = CELIX_SUCCESS;
+	struct epoll_event ev;
+	memset(&ev, 0, sizeof(ev));
+
+	ts->running = false;
+
+	pthread_kill(ts->recv_thread.thread,SIGUSR1);
+
+	celixThread_join(ts->recv_thread,NULL);
+
+	status = serviceTracker_close(ts->tracker);
+
+	celixThreadMutex_lock(&ts->socketMap_lock);
+	hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
+	while(hashMapIterator_hasNext(it)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
+		char *url = hashMapEntry_getKey(entry);
+		int *s = hashMapEntry_getValue(entry);
+		memset(&ev, 0, sizeof(ev));
+		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+			printf("in if error()\n");
+			perror("epoll_ctl() EPOLL_CTL_DEL");
+			status += CELIX_SERVICE_EXCEPTION;
+		}
+		free(s);
+		free(url);
+		//hashMapIterator_remove(it);
+	}
+	hashMapIterator_destroy(it);
+	hashMap_clear(ts->socketMap, false, false);
+	celixThreadMutex_unlock(&ts->socketMap_lock);
+
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) {
+
+	printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
+
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->socketMap_lock);
+
+	if(!hashMap_containsKey(ts->socketMap, pubURL)){
+
+		int *recvSocket = calloc(sizeof(int), 1);
+		*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+		if (*recvSocket < 0) {
+			perror("pubsub_topicSubscriptionCreate:socket");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+
+		if (status == CELIX_SUCCESS){
+			int reuse = 1;
+			if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
+				perror("setsockopt() SO_REUSEADDR");
+				status = CELIX_SERVICE_EXCEPTION;
+			}
+		}
+
+		if(status == CELIX_SUCCESS){
+			// TODO Check if there is a better way to parse the URL to IP/Portnr
+			//replace ':' by spaces
+			char *url = strdup(pubURL);
+			char *pt = url;
+			while((pt=strchr(pt, ':')) != NULL) {
+				*pt = ' ';
+			}
+			char mcIp[100];
+			unsigned short mcPort;
+			sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+			free(url);
+
+			printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+
+			struct ip_mreq mc_addr;
+			mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+			mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+			printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+			if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+				perror("setsockopt() IP_ADD_MEMBERSHIP");
+				status = CELIX_SERVICE_EXCEPTION;
+			}
+
+			if (status == CELIX_SUCCESS){
+				struct sockaddr_in mcListenAddr;
+				mcListenAddr.sin_family = AF_INET;
+				mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+				mcListenAddr.sin_port = htons(mcPort);
+				if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+					perror("bind()");
+					status = CELIX_SERVICE_EXCEPTION;
+				}
+			}
+
+			if (status == CELIX_SUCCESS){
+#if defined(__APPLE__) && defined(__MACH__)
+				//TODO: Use kqueue for OSX
+#else
+				struct epoll_event ev;
+				memset(&ev, 0, sizeof(ev));
+				ev.events = EPOLLIN;
+				ev.data.fd = *recvSocket;
+				if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
+					perror("epoll_ctl() EPOLL_CTL_ADD");
+					status = CELIX_SERVICE_EXCEPTION;
+				}
+#endif
+			}
+
+		}
+
+		if (status == CELIX_SUCCESS){
+			hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket);
+		}
+		else{
+			free(recvSocket);
+		}
+	}
+
+	celixThreadMutex_unlock(&ts->socketMap_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_add(ts->pendingConnections, url);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_add(ts->pendingDisconnections, url);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
+	printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL);
+	celix_status_t status = CELIX_SUCCESS;
+	struct epoll_event ev;
+	memset(&ev, 0, sizeof(ev));
+
+	celixThreadMutex_lock(&ts->socketMap_lock);
+
+	if (hashMap_containsKey(ts->socketMap, pubURL)){
+
+#if defined(__APPLE__) && defined(__MACH__)
+		//TODO: Use kqueue for OSX
+#else
+		int *s = hashMap_remove(ts->socketMap, pubURL);
+		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+			printf("in if error()\n");
+			perror("epoll_ctl() EPOLL_CTL_DEL");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+		free(s);
+#endif
+
+	}
+
+	celixThreadMutex_unlock(&ts->socketMap_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	arrayList_add(ts->sub_ep_list,subEP);
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+
+}
+
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	ts->nrSubscribers++;
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	arrayList_removeElement(ts->sub_ep_list,subEP);
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	ts->nrSubscribers--;
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
+	return ts->nrSubscribers;
+}
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){
+	return sub->sub_ep_list;
+}
+
+
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+	celix_status_t status = CELIX_SUCCESS;
+	topic_subscription_pt ts = handle;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	if (!hashMap_containsKey(ts->servicesMap, service)) {
+		bundle_pt bundle = NULL;
+		hash_map_pt msgTypes = NULL;
+
+		serviceReference_getBundle(reference, &bundle);
+
+		if(ts->serializer != NULL && bundle!=NULL){
+			ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+			if(msgTypes != NULL){
+				hashMap_put(ts->servicesMap, service, msgTypes);
+				printf("PSA_UDP_MC_TS: New subscriber registered.\n");
+			}
+		}
+		else{
+			printf("PSA_UDP_MC_TS: Cannot register new subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+
+}
+
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+	celix_status_t status = CELIX_SUCCESS;
+	topic_subscription_pt ts = handle;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	if (hashMap_containsKey(ts->servicesMap, service)) {
+		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+		if(msgTypes!=NULL && ts->serializer!=NULL){
+			ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+			printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+		}
+		else{
+			printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	printf("PSA_UDP_MC_TS: Subscriber unregistered.\n");
+	return status;
+}
+
+
+static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){
+
+	celixThreadMutex_lock(&sub->ts_lock);
+	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+	while (hashMapIterator_hasNext(iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
+		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+
+		pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type);
+		if (msgSer == NULL) {
+			printf("PSA_UDP_MC_TS: Serializer not available for message %d.\n",msg->header.type);
+		}
+		else{
+			void *msgInst = NULL;
+			bool validVersion = checkVersion(msgSer->msgVersion,&msg->header);
+
+			if(validVersion){
+
+				celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
+				if (status == CELIX_SUCCESS) {
+					bool release = true;
+					pubsub_multipart_callbacks_t mp_callbacks;
+					mp_callbacks.handle = sub;
+					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
+					mp_callbacks.getMultipart = NULL;
+
+					subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+
+					if(release){
+						msgSer->freeMsg(msgSer,msgInst);
+					}
+				}
+				else{
+					printf("PSA_UDP_MC_TS: Cannot deserialize msgType %s.\n",msgSer->msgName);
+				}
+
+			}
+			else{
+				int major=0,minor=0;
+				version_getMajor(msgSer->msgVersion,&major);
+				version_getMinor(msgSer->msgVersion,&minor);
+				printf("PSA_UDP_MC_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+						msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
+			}
+
+		}
+	}
+	hashMapIterator_destroy(iter);
+	celixThreadMutex_unlock(&sub->ts_lock);
+}
+
+static void* udp_recv_thread_func(void * arg) {
+	topic_subscription_pt sub = (topic_subscription_pt) arg;
+
+#if defined(__APPLE__) && defined(__MACH__)
+	//TODO: use kqueue for OSX
+	//struct kevent events[MAX_EPOLL_EVENTS];
+	while (sub->running) {
+		int nfds = 0;
+		if(nfds > 0) {
+			pubsub_udp_msg_t* udpMsg = NULL;
+			process_msg(sub, udpMsg);
+		}
+	}
+#else
+	struct epoll_event events[MAX_EPOLL_EVENTS];
+
+	while (sub->running) {
+		int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+		int i;
+		for(i = 0; i < nfds; i++ ) {
+			unsigned int index;
+			unsigned int size;
+			if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
+				// Handle data
+				pubsub_udp_msg_t *udpMsg = NULL;
+				if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
+					printf("PSA_UDP_MC_TS: ERROR largeUdp_read with index %d\n", index);
+					continue;
+				}
+
+				process_msg(sub, udpMsg);
+
+				free(udpMsg);
+			}
+		}
+		connectPendingPublishers(sub);
+		disconnectPendingPublishers(sub);
+	}
+#endif
+
+	return NULL;
+}
+
+static void connectPendingPublishers(topic_subscription_pt sub) {
+	celixThreadMutex_lock(&sub->pendingConnections_lock);
+	while(!arrayList_isEmpty(sub->pendingConnections)) {
+		char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+		pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+		free(pubEP);
+	}
+	celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
+
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+	celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+	while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+		char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+		pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+		free(pubEP);
+	}
+	celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+	printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n");
+	return;
+}
+
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+	bool check=false;
+	int major=0,minor=0;
+
+	if(msgVersion!=NULL){
+		version_getMajor(msgVersion,&major);
+		version_getMinor(msgVersion,&minor);
+		if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+			check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+		}
+	}
+
+	return check;
+}
+
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
+	*msgTypeId = utils_stringHash(msgType);
+	return 0;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
new file mode 100644
index 0000000..475416a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
@@ -0,0 +1,60 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.h
+ *
+ *  \date       Sep 22, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_SUBSCRIPTION_H_
+#define TOPIC_SUBSCRIPTION_H_
+
+#include "celix_threads.h"
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_tracker.h"
+
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+#include "pubsub_serializer.h"
+
+typedef struct topic_subscription* topic_subscription_pt;
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
+
+celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
+celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL);
+celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub);
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
+
+#endif /*TOPIC_SUBSCRIPTION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
new file mode 100644
index 0000000..ddc17eb
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_PUBSUB_PSA_ZMQ)
+
+	find_package(ZMQ REQUIRED)
+	find_package(CZMQ REQUIRED)
+	find_package(Jansson REQUIRED)
+
+	if (BUILD_ZMQ_SECURITY)
+		add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1)
+
+		find_package(OpenSSL 1.1.0 REQUIRED)
+		include_directories("${OPENSSL_INCLUDE_DIR}")
+
+		set (ZMQ_CRYPTO_C "private/src/zmq_crypto.c")
+	endif()
+
+	add_celix_bundle(celix_pubsub_admin_zmq
+			BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
+			VERSION "1.0.0"
+			SOURCES
+			src/psa_activator.c
+			src/pubsub_admin_impl.c
+			src/topic_subscription.c
+			src/topic_publication.c
+			${ZMQ_CRYPTO_C}
+	)
+
+	set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
+	target_link_libraries(celix_pubsub_admin_zmq PRIVATE
+			Celix::pubsub_spi
+			Celix::framework Celix::dfi Celix::log_helper
+			${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}
+	)
+	target_include_directories(celix_pubsub_admin_zmq PRIVATE
+		${ZMQ_INCLUDE_DIR}
+		${CZMQ_INCLUDE_DIR}
+		${JANSSON_INCLUDE_DIR}
+		src
+	)
+
+	install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub)
+	target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
+	add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
+endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
new file mode 100644
index 0000000..008dff5
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -0,0 +1,186 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * psa_activator.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+
+#include "pubsub_admin_impl.h"
+
+
+struct activator {
+	pubsub_admin_pt admin;
+	pubsub_admin_service_pt adminService;
+	service_registration_pt registration;
+	service_tracker_pt serializerTracker;
+};
+
+
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
+    struct activator *act= (struct activator*)handle;
+    if (act->admin->externalPublications && !hashMap_isEmpty(act->admin->externalPublications)) {
+        fprintf(outStream, "External Publications:\n");
+        for(hash_map_iterator_t iter = hashMapIterator_construct(act->admin->externalPublications); hashMapIterator_hasNext(&iter);) {
+            const char* key = (const char*)hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    if (act->admin->localPublications && !hashMap_isEmpty(act->admin->localPublications)) {
+        fprintf(outStream, "Local Publications:\n");
+        for (hash_map_iterator_t iter = hashMapIterator_construct(
+                act->admin->localPublications); hashMapIterator_hasNext(&iter);) {
+            const char *key = (const char *) hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    if (act->admin->subscriptions && !hashMap_isEmpty(act->admin->subscriptions)) {
+        fprintf(outStream, "Active Subscriptions:\n");
+        for (hash_map_iterator_t iter = hashMapIterator_construct(
+                act->admin->subscriptions); hashMapIterator_hasNext(&iter);) {
+            const char *key = (const char *) hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    if (act->admin->pendingSubscriptions && !hashMap_isEmpty(act->admin->pendingSubscriptions)) {
+        fprintf(outStream, "Pending Subscriptions:\n");
+        for (hash_map_iterator_t iter = hashMapIterator_construct(
+                act->admin->pendingSubscriptions); hashMapIterator_hasNext(&iter);) {
+            const char *key = (const char *) hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator;
+
+	activator = calloc(1, sizeof(*activator));
+	if (!activator) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		*userData = activator;
+
+		status = pubsubAdmin_create(context, &(activator->admin));
+
+		if(status == CELIX_SUCCESS){
+			service_tracker_customizer_pt customizer = NULL;
+			status = serviceTrackerCustomizer_create(activator->admin,
+					NULL,
+					pubsubAdmin_serializerAdded,
+					NULL,
+					pubsubAdmin_serializerRemoved,
+					&customizer);
+			if(status == CELIX_SUCCESS){
+				status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+                if (status == CELIX_SUCCESS) {
+                    properties_pt shellProps = properties_create();
+                    properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_zmq_info");
+                    properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_zmq_info");
+                    properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "psa_zmq_info: Overview of PubSub ZMQ Admin");
+                    activator->admin->shellCmdService.handle = activator;
+                    activator->admin->shellCmdService.executeCommand = shellCommand;
+                    bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &activator->admin->shellCmdService, shellProps, &activator->admin->shellCmdReg);
+                } else {
+					serviceTrackerCustomizer_destroy(customizer);
+					pubsubAdmin_destroy(activator->admin);
+				}
+			}
+			else{
+				pubsubAdmin_destroy(activator->admin);
+			}
+		}
+	}
+	return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+	pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc));
+
+	if (!pubsubAdminSvc) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		pubsubAdminSvc->admin = activator->admin;
+
+		pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
+		pubsubAdminSvc->removePublication = pubsubAdmin_removePublication;
+
+		pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
+		pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription;
+
+		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
+		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
+
+		pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+
+		activator->adminService = pubsubAdminSvc;
+
+		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+
+		status += serviceTracker_open(activator->serializerTracker);
+
+	}
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	status += serviceTracker_close(activator->serializerTracker);
+	serviceRegistration_unregister(activator->admin->shellCmdReg);
+	activator->admin->shellCmdReg = NULL;
+	status += serviceRegistration_unregister(activator->registration);
+
+	activator->registration = NULL;
+
+	free(activator->adminService);
+	activator->adminService = NULL;
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceTracker_destroy(activator->serializerTracker);
+	pubsubAdmin_destroy(activator->admin);
+	activator->admin = NULL;
+	free(activator);
+
+	return status;
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
new file mode 100644
index 0000000..e9bb6c3
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -0,0 +1,1183 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "pubsub_admin_impl.h"
+#include <zmq.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#ifndef ANDROID
+#include <ifaddrs.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "constants.h"
+#include "utils.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "bundle.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_helper.h"
+#include "log_service.h"
+#include "celix_threads.h"
+#include "service_factory.h"
+
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_utils.h"
+#include "pubsub/subscriber.h"
+
+#define MAX_KEY_FOLDER_PATH_LENGTH 512
+
+static const char *DEFAULT_IP = "127.0.0.1";
+
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
+    celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    if (!zsys_has_curve()){
+        printf("PSA_ZMQ: zeromq curve unsupported\n");
+        return CELIX_SERVICE_EXCEPTION;
+    }
+#endif
+
+    *admin = calloc(1, sizeof(**admin));
+
+    if (!*admin){
+        return CELIX_ENOMEM;
+    }
+
+    const char *ip = NULL;
+    char *detectedIp = NULL;
+    (*admin)->bundle_context= context;
+    (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+    (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+    (*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
+    arrayList_create(&((*admin)->noSerializerSubscriptions));
+    arrayList_create(&((*admin)->noSerializerPublications));
+    arrayList_create(&((*admin)->serializerList));
+
+    celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+    celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+    celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+    celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+    celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+    celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
+    celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+    celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
+
+    celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+    celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+    celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
+    if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+        logHelper_start((*admin)->loghelper);
+    }
+
+    bundleContext_getProperty(context,PSA_IP , &ip);
+
+#ifndef ANDROID
+    if (ip == NULL) {
+        const char *interface = NULL;
+
+        bundleContext_getProperty(context, PSA_ITF, &interface);
+        if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
+            logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface);
+        }
+
+        ip = detectedIp;
+    }
+#endif
+
+    if (ip != NULL) {
+        logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
+        (*admin)->ipAddress = strdup(ip);
+    }
+    else {
+        logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP);
+        (*admin)->ipAddress = strdup(DEFAULT_IP);
+    }
+
+    if (detectedIp != NULL) {
+        free(detectedIp);
+    }
+
+    const char* basePortStr = NULL;
+    const char* maxPortStr = NULL;
+    char* endptrBase = NULL;
+    char* endptrMax = NULL;
+    bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
+    bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
+    (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
+    (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
+    if (*endptrBase != '\0') {
+        (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
+    }
+    if (*endptrMax != '\0') {
+        (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
+    }
+
+    // Disable Signal Handling by CZMQ
+    setenv("ZSYS_SIGHANDLER", "false", true);
+
+    const char *nrZmqThreads = NULL;
+    bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
+
+    if(nrZmqThreads != NULL) {
+        char *endPtr = NULL;
+        unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
+        if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
+            zsys_set_io_threads(nrThreads);
+            logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
+            printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads);
+        }
+    }
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    // Setup authenticator
+    zactor_t* auth = zactor_new (zauth, NULL);
+    zstr_sendx(auth, "VERBOSE", NULL);
+
+    // Load all public keys of subscribers into the application
+    // This step is done for authenticating subscribers
+    char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
+    char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
+    snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir);
+    zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
+    free(keys_bundle_dir);
+
+    (*admin)->zmq_auth = auth;
+#endif
+
+
+    (*admin)->defaultScore = PSA_ZMQ_DEFAULT_SCORE;
+    (*admin)->qosSampleScore = PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE;
+    (*admin)->qosControlScore = PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE;
+
+    const char *defaultScoreStr = NULL;
+    const char *sampleScoreStr = NULL;
+    const char *controlScoreStr = NULL;
+    bundleContext_getProperty(context, PSA_ZMQ_DEFAULT_SCORE_KEY, &defaultScoreStr);
+    bundleContext_getProperty(context, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
+    bundleContext_getProperty(context, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
+
+    if (defaultScoreStr != NULL) {
+        (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+    }
+    if (sampleScoreStr != NULL) {
+        (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+    }
+    if (controlScoreStr != NULL) {
+        (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+    }
+
+    (*admin)->verbose = PSA_ZMQ_DEFAULT_VERBOSE;
+    const char *verboseStr = NULL;
+    bundleContext_getProperty(context, PSA_ZMQ_VERBOSE_KEY, &verboseStr);
+    if (verboseStr != NULL) {
+        (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+    }
+
+    if ((*admin)->verbose) {
+        printf("PSA ZMQ Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
+    }
+
+    return status;
+}
+
+
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+{
+    celix_status_t status = CELIX_SUCCESS;
+
+    free(admin->ipAddress);
+
+    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+    hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
+    while(hashMapIterator_hasNext(iter)){
+        hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+        free((char*)hashMapEntry_getKey(entry));
+        arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->pendingSubscriptions,false,false);
+    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    hashMap_destroy(admin->subscriptions,false,false);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    celixThreadMutex_lock(&admin->localPublicationsLock);
+    hashMap_destroy(admin->localPublications,true,false);
+    celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+    celixThreadMutex_lock(&admin->externalPublicationsLock);
+    iter = hashMapIterator_create(admin->externalPublications);
+    while(hashMapIterator_hasNext(iter)){
+        hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+        free((char*)hashMapEntry_getKey(entry));
+        arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->externalPublications,false,false);
+    celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    arrayList_destroy(admin->serializerList);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+    arrayList_destroy(admin->noSerializerSubscriptions);
+    arrayList_destroy(admin->noSerializerPublications);
+    celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+
+    iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+    while(hashMapIterator_hasNext(iter)){
+        arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+    iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+    while(hashMapIterator_hasNext(iter)){
+        arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+    celixThreadMutex_destroy(&admin->usedSerializersLock);
+    celixThreadMutex_destroy(&admin->serializerListLock);
+    celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+
+    celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
+    celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+
+    celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+    celixThreadMutex_destroy(&admin->subscriptionsLock);
+
+    celixThreadMutex_destroy(&admin->localPublicationsLock);
+    celixThreadMutex_destroy(&admin->externalPublicationsLock);
+
+    logHelper_stop(admin->loghelper);
+
+    logHelper_destroy(&admin->loghelper);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    if (admin->zmq_auth != NULL){
+        zactor_destroy(&(admin->zmq_auth));
+    }
+#endif
+
+    free(admin);
+
+    return status;
+}
+
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+
+    topic_subscription_pt any_sub = hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+
+    if(any_sub==NULL){
+
+        int i;
+        pubsub_serializer_service_t *best_serializer = NULL;
+        const char *serType = NULL;
+        if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+            status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, serType, &any_sub);
+        }
+        else{
+            printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+                    properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+            celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+            arrayList_add(admin->noSerializerSubscriptions,subEP);
+            celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+        }
+
+        if (status == CELIX_SUCCESS){
+
+            /* Connect all internal publishers */
+            celixThreadMutex_lock(&admin->localPublicationsLock);
+            hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
+            while(hashMapIterator_hasNext(lp_iter)){
+                service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
+                topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
+                array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                if(topic_publishers!=NULL){
+                    for(i=0;i<arrayList_size(topic_publishers);i++){
+                        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                        if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        }
+                    }
+                    arrayList_destroy(topic_publishers);
+                }
+            }
+            hashMapIterator_destroy(lp_iter);
+            celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+            /* Connect also all external publishers */
+            celixThreadMutex_lock(&admin->externalPublicationsLock);
+            hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
+            while(hashMapIterator_hasNext(extp_iter)){
+                array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
+                if(ext_pub_list!=NULL){
+                    for(i=0;i<arrayList_size(ext_pub_list);i++){
+                        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                        if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                            status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        }
+                    }
+                }
+            }
+            hashMapIterator_destroy(extp_iter);
+            celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+
+            pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
+
+            status += pubsub_topicSubscriptionStart(any_sub);
+
+        }
+
+        if (status == CELIX_SUCCESS){
+            hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+            connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
+        }
+
+    }
+
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    return status;
+}
+
+/**
+ * A subcriber service is registered and this PSA had won the match
+ * (based on qos or other meta data)
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
+ */
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
+        return pubsubAdmin_addAnySubscription(admin,subEP);
+    }
+
+    /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
+    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    celixThreadMutex_lock(&admin->localPublicationsLock);
+    celixThreadMutex_lock(&admin->externalPublicationsLock);
+
+    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+    array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+
+    if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
+        pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
+    }
+    else{
+        int i;
+        topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
+
+        if(subscription == NULL) {
+            pubsub_serializer_service_t *best_serializer = NULL;
+            const char *serType = NULL;
+            if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+                status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription);
+            }
+            else{
+                printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+                        properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerSubscriptions,subEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+
+            if (status==CELIX_SUCCESS){
+
+                /* Try to connect internal publishers */
+                if(factory!=NULL){
+                    topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
+                    array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                    if(topic_publishers!=NULL){
+                        for(i=0;i<arrayList_size(topic_publishers);i++){
+                            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                            if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                                status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                            }
+                        }
+                        arrayList_destroy(topic_publishers);
+                    }
+
+                }
+
+                /* Look also for external publishers */
+                if(ext_pub_list!=NULL){
+                    for(i=0;i<arrayList_size(ext_pub_list);i++){
+                        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                        if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
+                            status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        }
+                    }
+                }
+
+                pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+
+                status += pubsub_topicSubscriptionStart(subscription);
+
+            }
+
+            if(status==CELIX_SUCCESS){
+
+                hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+
+                connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
+            }
+        }
+
+        if (status == CELIX_SUCCESS){
+            pubsub_topicIncreaseNrSubscribers(subscription);
+        }
+    }
+
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->externalPublicationsLock);
+    celixThreadMutex_unlock(&admin->localPublicationsLock);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+                properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+                properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+    if(sub!=NULL){
+        pubsub_topicDecreaseNrSubscribers(sub);
+        if(pubsub_topicGetNrSubscribers(sub) == 0) {
+            status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
+        }
+    }
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    if(sub==NULL){
+        /* Maybe the endpoint was pending */
+        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+        if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
+            status = CELIX_ILLEGAL_STATE;
+        }
+        celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+    }
+
+    free(scope_topic);
+
+
+
+    return status;
+
+}
+
+
+/**
+ * A bundle has shown interested in a publisher service and this PSA had won the match
+ * based on filter or embedded topic.properties (extender pattern)
+ * OR !!
+ * A remote publication has been discovered and forwarded to this call
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
+ */
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *fwUUID = NULL;
+    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+    if (fwUUID == NULL) {
+        printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+
+    const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
+
+    if (isOwn) {
+        //should be null, willl be set in this call
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+    } else {
+        //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) != NULL);
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
+    }
+
+    if (isOwn) {
+        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+    }
+
+
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
+            (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
+
+        celixThreadMutex_lock(&admin->localPublicationsLock);
+
+        service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
+
+        if (factory == NULL) {
+            topic_publication_pt pub = NULL;
+            pubsub_serializer_service_t *best_serializer = NULL;
+            const char *serType = NULL;
+            if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+                status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+                if (isOwn) {
+                    properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                }
+            }
+            else {
+                printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
+                        properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerPublications,pubEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+
+            if (status == CELIX_SUCCESS) {
+                status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
+                if (status == CELIX_SUCCESS && factory != NULL) {
+                    hashMap_put(admin->localPublications, strdup(scope_topic), factory);
+                    connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
+                }
+            } else {
+                printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
+                        properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                        properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+                        properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
+            }
+        } else {
+            //just add the new EP to the list
+            topic_publication_pt pub = (topic_publication_pt) factory->handle;
+            pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+        }
+
+        celixThreadMutex_unlock(&admin->localPublicationsLock);
+    }
+    else{
+
+        celixThreadMutex_lock(&admin->externalPublicationsLock);
+        array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
+        if (ext_pub_list == NULL) {
+            arrayList_create(&ext_pub_list);
+            hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
+        }
+
+        arrayList_add(ext_pub_list, pubEP);
+
+        celixThreadMutex_unlock(&admin->externalPublicationsLock);
+    }
+
+    /* Re-evaluate the pending subscriptions */
+    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+
+    hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+    if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
+        char* topic = (char*) hashMapEntry_getKey(pendingSub);
+        array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
+        int i;
+        for (i = 0; i < arrayList_size(pendingSubList); i++) {
+            pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
+            pubsubAdmin_addSubscription(admin, subEP);
+        }
+        hashMap_remove(admin->pendingSubscriptions, scope_topic);
+        arrayList_clear(pendingSubList);
+        arrayList_destroy(pendingSubList);
+        free(topic);
+    }
+
+    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+    /* Connect the new publisher to the subscription for his topic, if there is any */
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+
+    topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
+    if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    /* And check also for ANY subscription */
+    topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+    if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    free(scope_topic);
+
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+                isOwn);
+    }
+
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
+    celix_status_t status = CELIX_SUCCESS;
+    int count = 0;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    const char* fwUUID = NULL;
+
+    bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+    if(fwUUID==NULL){
+        fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+
+        celixThreadMutex_lock(&admin->localPublicationsLock);
+        service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+        if(factory!=NULL){
+            topic_publication_pt pub = (topic_publication_pt)factory->handle;
+            pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
+        }
+        celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+        if(factory==NULL){
+            /* Maybe the endpoint was pending */
+            celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+            if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
+                status = CELIX_ILLEGAL_STATE;
+            }
+            celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+        }
+    }
+    else{
+
+        celixThreadMutex_lock(&admin->externalPublicationsLock);
+        array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+        if(ext_pub_list!=NULL){
+            int i;
+            bool found = false;
+            for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
+                pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                found = pubsubEndpoint_equals(pubEP,p);
+                if (found){
+                    arrayList_remove(ext_pub_list,i);
+                }
+            }
+            // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
+            for(i=0; i<arrayList_size(ext_pub_list);i++) {
+                pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) {
+                    count++;
+                }
+            }
+
+            if(arrayList_size(ext_pub_list)==0){
+                hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
+                char* topic = (char*)hashMapEntry_getKey(entry);
+                array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
+                hashMap_remove(admin->externalPublications,topic);
+                arrayList_destroy(list);
+                free(topic);
+            }
+        }
+
+        celixThreadMutex_unlock(&admin->externalPublicationsLock);
+    }
+
+    /* Check if this publisher was connected to one of our subscribers*/
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+
+    topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+    if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    /* And check also for ANY subscription */
+    topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+    if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+        pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Closing all publications\n");
+    }
+
+    celixThreadMutex_lock(&admin->localPublicationsLock);
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
+    if(pubsvc_entry!=NULL){
+        char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
+        service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
+        topic_publication_pt pub = (topic_publication_pt)factory->handle;
+        status += pubsub_topicPublicationStop(pub);
+        disconnectTopicPubSubFromSerializer(admin, pub, true);
+        status += pubsub_topicPublicationDestroy(pub);
+        hashMap_remove(admin->localPublications,scope_topic);
+        free(key);
+        free(factory);
+    }
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Closing all subscriptions\n");
+    }
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
+    if(sub_entry!=NULL){
+        char* topic = (char*)hashMapEntry_getKey(sub_entry);
+
+        topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
+        status += pubsub_topicSubscriptionStop(ts);
+        disconnectTopicPubSubFromSerializer(admin, ts, false);
+        status += pubsub_topicSubscriptionDestroy(ts);
+        hashMap_remove(admin->subscriptions,scope_topic);
+        free(topic);
+
+    }
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    return status;
+
+}
+
+
+#ifndef ANDROID
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+    struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+        {
+            if (ifa->ifa_addr == NULL)
+                continue;
+
+            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+                if (interface == NULL) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+                else if (strcmp(ifa->ifa_name, interface) == 0) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+            }
+        }
+
+        freeifaddrs(ifaddr);
+    }
+
+    return status;
+}
+#endif
+
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+    char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+    array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
+    if(pendingListPerTopic==NULL){
+        arrayList_create(&pendingListPerTopic);
+        hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
+    }
+    arrayList_add(pendingListPerTopic,subEP);
+    free(scope_topic);
+    return status;
+}
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
+    /* Assumption: serializers are all available at startup.
+     * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
+
+    celix_status_t status = CELIX_SUCCESS;
+    int i=0;
+
+    const char *serType = NULL;
+    serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+    if (serType == NULL) {
+        fprintf(stderr, "WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
+        return CELIX_SERVICE_EXCEPTION;
+    }
+
+    pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+    celixThreadMutex_lock(&admin->serializerListLock);
+    arrayList_add(admin->serializerList, reference);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    /* Now let's re-evaluate the pending */
+    celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+    for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+        pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+        pubsub_serializer_service_t *best_serializer = NULL;
+        pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
+        if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+            pubsubAdmin_addSubscription(admin, ep);
+        }
+    }
+
+    for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+        pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+        pubsub_serializer_service_t *best_serializer = NULL;
+        pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
+        if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+            pubsubAdmin_addPublication(admin, ep);
+        }
+    }
+
+    celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: %s serializer added\n", serType);
+    }
+
+    return status;
+}
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
+
+    pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+    int i=0, j=0;
+    const char *serType = NULL;
+
+    serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+    if (serType == NULL) {
+        printf("WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
+        return CELIX_SERVICE_EXCEPTION;
+    }
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    /* Remove the serializer from the list */
+    arrayList_removeElement(admin->serializerList, reference);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+    array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+    array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+    /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
+    if(topicPubList!=NULL){
+        for(i=0;i<arrayList_size(topicPubList);i++){
+            topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
+            /* Stop the topic publication */
+            pubsub_topicPublicationStop(topicPub);
+            /* Get the endpoints that are going to be orphan */
+            array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
+            for(j=0;j<arrayList_size(pubList);j++){
+                pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
+                /* Remove the publication */
+                pubsubAdmin_removePublication(admin, pubEP);
+                /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
+                if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
+                    properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+                }
+                /* Add the orphan endpoint to the noSerializer pending list */
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerPublications,pubEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+            arrayList_destroy(pubList);
+
+            /* Cleanup also the localPublications hashmap*/
+            celixThreadMutex_lock(&admin->localPublicationsLock);
+            hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
+            char *key = NULL;
+            service_factory_pt factory = NULL;
+            while(hashMapIterator_hasNext(iter)){
+                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+                factory = (service_factory_pt)hashMapEntry_getValue(entry);
+                topic_publication_pt pub = (topic_publication_pt)factory->handle;
+                if(pub==topicPub){
+                    key = (char*)hashMapEntry_getKey(entry);
+                    break;
+                }
+            }
+            hashMapIterator_destroy(iter);
+            if(key!=NULL){
+                hashMap_remove(admin->localPublications, key);
+                free(factory);
+                free(key);
+            }
+            celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+            /* Finally destroy the topicPublication */
+            pubsub_topicPublicationDestroy(topicPub);
+        }
+        arrayList_destroy(topicPubList);
+    }
+
+    /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
+    if(topicSubList!=NULL){
+        for(i=0;i<arrayList_size(topicSubList);i++){
+            topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
+            /* Stop the topic subscription */
+            pubsub_topicSubscriptionStop(topicSub);
+            /* Get the endpoints that are going to be orphan */
+            array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
+            for(j=0;j<arrayList_size(subList);j++){
+                pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
+                /* Remove the subscription */
+                pubsubAdmin_removeSubscription(admin, subEP);
+                /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
+                if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
+                    properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL);
+                }
+                /* Add the orphan endpoint to the noSerializer pending list */
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerSubscriptions,subEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+
+            /* Cleanup also the subscriptions hashmap*/
+            celixThreadMutex_lock(&admin->subscriptionsLock);
+            hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
+            char *key = NULL;
+            while(hashMapIterator_hasNext(iter)){
+                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+                topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
+                if(sub==topicSub){
+                    key = (char*)hashMapEntry_getKey(entry);
+                    break;
+                }
+            }
+            hashMapIterator_destroy(iter);
+            if(key!=NULL){
+                hashMap_remove(admin->subscriptions, key);
+                free(key);
+            }
+            celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+            /* Finally destroy the topicSubscription */
+            pubsub_topicSubscriptionDestroy(topicSub);
+        }
+        arrayList_destroy(topicSubList);
+    }
+
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: %s serializer removed\n", serType);
+    }
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *fwUuid = NULL;
+    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+    if (fwUuid == NULL) {
+        return CELIX_ILLEGAL_STATE;
+    }
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    status = pubsub_admin_match(endpoint, PSA_ZMQ_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, score);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    pubsub_serializer_service_t *serSvc = NULL;
+    service_reference_pt svcRef = NULL;
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    if (svcRef != NULL) {
+        bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
+        bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
+        if (serTypeOut != NULL) {
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serTypeOut);
+        }
+    }
+
+
+    *svcOut = serSvc;
+
+    return status;
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+
+    hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+    array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+    if(list==NULL){
+        arrayList_create(&list);
+        hashMap_put(map,serializer,list);
+    }
+    arrayList_add(list,topicPubSub);
+
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+
+    hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+    hash_map_iterator_pt iter = hashMapIterator_create(map);
+    while(hashMapIterator_hasNext(iter)){
+        array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
+        if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+            break;
+        }
+    }
+    hashMapIterator_destroy(iter);
+
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+


Mime
View raw message