celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [36/60] [abbrv] [partial] celix git commit: CELIX-424: Cleans up the directory structure. Moves all libraries to the libs subdir and all bundles to the bundles subdir
Date Sun, 27 May 2018 18:52:44 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
new file mode 100644
index 0000000..a91e820
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -0,0 +1,64 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer.h
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
+
+#include "service_reference.h"
+#include "hash_map.h"
+
+#include "pubsub_common.h"
+
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+
+typedef struct pubsub_msg_serializer {
+	void* handle;
+	unsigned int msgId;
+	const char* msgName;
+	version_pt msgVersion;
+
+	celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen);
+	celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+	void (*freeMsg)(void* handle, void* msg);
+
+} pubsub_msg_serializer_t;
+
+typedef struct pubsub_serializer_service {
+	void* handle;
+
+	celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap);
+	celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap);
+
+} pubsub_serializer_service_t;
+
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor b/bundles/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
new file mode 100644
index 0000000..c01a2fd
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
@@ -0,0 +1,10 @@
+:header
+type=interface
+name=pubsub_topic_info
+version=1.0.0
+:annotations
+:types
+:methods
+getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N
+getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N
+getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
new file mode 100644
index 0000000..1c92a9b
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -0,0 +1,45 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_UTILS_H_
+#define PUBSUB_UTILS_H_
+
+#include "bundle_context.h"
+#include "array_list.h"
+
+/**
+ * Returns the pubsub info from the provided filter. A pubsub filter should have a topic and can 
+ * have a scope. If no topic is present the topic and scope output will be NULL.
+ * If a topic is present the topic output will contain a allocated topic string and if a scope was
+ * present a allocated scope string.
+ * The caller is owner of the topic and scope output string.
+ */
+celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topic, char **scope);
+
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+
+
+#endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
new file mode 100644
index 0000000..5d0fcc9
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_admin_match.c
@@ -0,0 +1,169 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <string.h>
+#include <limits.h>
+
+#include "service_reference.h"
+
+#include "pubsub_admin.h"
+
+#include "pubsub_admin_match.h"
+#include "constants.h"
+
+/*
+ * Match can be called by
+ * a) a local registered pubsub_subscriber service
+ * b) a local opened service tracker for a pubsub_publisher service
+ * c) a remote found publisher endpoint
+ * Note subscribers are not (yet) dicovered remotely
+ */
+celix_status_t pubsub_admin_match(
+		pubsub_endpoint_pt endpoint,
+		const char *pubsub_admin_type,
+		const char *frameworkUuid,
+		double sampleScore,
+		double controlScore,
+		double defaultScore,
+		array_list_pt serializerList,
+		double *out) {
+
+	celix_status_t status = CELIX_SUCCESS;
+	double score = 0;
+
+	const char *endpointFrameworkUuid		= NULL;
+	const char *endpointAdminType 			= NULL;
+
+	const char *requested_admin_type 		= NULL;
+	const char *requested_qos_type			= NULL;
+
+	if (endpoint->endpoint_props != NULL) {
+		endpointFrameworkUuid = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+		endpointAdminType = properties_get(endpoint->endpoint_props, PUBSUB_ENDPOINT_ADMIN_TYPE);
+	}
+	if (endpoint->topic_props != NULL) {
+		requested_admin_type = properties_get(endpoint->topic_props, PUBSUB_ADMIN_TYPE_KEY);
+		requested_qos_type = properties_get(endpoint->topic_props, QOS_ATTRIBUTE_KEY);
+	}
+
+	if (endpointFrameworkUuid != NULL && frameworkUuid != NULL && strncmp(frameworkUuid, endpointFrameworkUuid, 128) == 0) {
+		//match for local subscriber or publisher
+
+		/* Analyze the pubsub_admin */
+		if (requested_admin_type != NULL) { /* We got precise specification on the pubsub_admin we want */
+			if (strncmp(requested_admin_type, pubsub_admin_type, strlen(pubsub_admin_type)) == 0) { //Full match
+				score = PUBSUB_ADMIN_FULL_MATCH_SCORE;
+			}
+		} else if (requested_qos_type != NULL) { /* We got QoS specification that will determine the selected PSA */
+			if (strncmp(requested_qos_type, QOS_TYPE_SAMPLE, strlen(QOS_TYPE_SAMPLE)) == 0) {
+				score = sampleScore;
+			} else if (strncmp(requested_qos_type, QOS_TYPE_CONTROL, strlen(QOS_TYPE_CONTROL)) == 0) {
+				score += controlScore;
+			} else {
+				printf("Unknown QoS type '%s'\n", requested_qos_type);
+				status = CELIX_ILLEGAL_ARGUMENT;
+			}
+		} else { /* We got no specification: fallback to default score */
+			score = defaultScore;
+		}
+
+		//NOTE serializer influence the score if a specific serializer is configured and not available.
+		//get best serializer. This is based on service raking or requested serializer. In the case of a request NULL is return if not request match is found.
+		service_reference_pt serSvcRef = NULL;
+		pubsub_admin_get_best_serializer(endpoint->topic_props, serializerList, &serSvcRef);
+		const char *serType = NULL; //for printing info
+		if (serSvcRef == NULL) {
+			score = 0;
+		} else {
+			serviceReference_getProperty(serSvcRef, PUBSUB_SERIALIZER_TYPE_KEY, &serType);
+		}
+
+		printf("Score for psa type %s is %f. Serializer used is '%s'\n", pubsub_admin_type, score, serType);
+	} else {
+		//remote publisher. score will be 0 or 100. nothing else.
+		//TODO FIXME remote publisher should go through a different process. Currently it is confusing what to match
+		if (endpointAdminType == NULL) {
+			score = 0;
+
+//			const char *key = NULL;
+//			printf("Endpoint properties:\n");
+//			PROPERTIES_FOR_EACH(endpoint->endpoint_props, key) {
+//				printf("\t%s=%s\n", key, properties_get(endpoint->endpoint_props, key));
+//			}
+
+			fprintf(stderr, "WARNING PSA MATCH: remote publisher has no type. The key '%s' must be specified\n", PUBSUB_ENDPOINT_ADMIN_TYPE);
+		} else  {
+			score = strncmp(endpointAdminType, pubsub_admin_type, 1024) == 0 ? 100 : 0;
+		}
+		printf("Score for psa type %s is %f. Publisher is remote\n", pubsub_admin_type, score);
+	}
+
+
+	*out = score;
+
+	return status;
+}
+
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, service_reference_pt *out){
+	celix_status_t status = CELIX_SUCCESS;
+	int i;
+	const char *requested_serializer_type = NULL;
+
+	if (endpoint_props != NULL){
+		requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+	}
+
+	service_reference_pt svcRef = NULL;
+    service_reference_pt best = NULL;
+    long hightestRanking = LONG_MIN;
+
+    if (requested_serializer_type != NULL) {
+        for (i = 0; i < arrayList_size(serializerList); ++i) {
+            svcRef = (service_reference_pt) arrayList_get(serializerList, 0);
+			const char* currentSerType = NULL;
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, &currentSerType);
+            if (currentSerType != NULL && strncmp(requested_serializer_type, currentSerType, 128) == 0) {
+                best = svcRef;
+                break;
+            }
+        }
+    } else {
+        //no specific serializer request -> search for highest ranking serializer service
+        for (i = 0; i < arrayList_size(serializerList); ++i) {
+            svcRef = (service_reference_pt)arrayList_get(serializerList,0);
+            const char *service_ranking_str  = NULL;
+			const char* currentSerType = NULL;
+            serviceReference_getProperty(svcRef, OSGI_FRAMEWORK_SERVICE_RANKING, &service_ranking_str);
+			serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, &currentSerType);
+            long svcRanking = service_ranking_str == NULL ? LONG_MIN : strtol(service_ranking_str, NULL, 10);
+            if (best == NULL || (svcRanking > hightestRanking && currentSerType != NULL)) {
+                best = svcRef;
+                hightestRanking = svcRanking;
+            }
+			if (currentSerType == NULL) {
+				fprintf(stderr, "Invalid pubsub_serializer service. Must have a property '%s'\n", PUBSUB_SERIALIZER_TYPE_KEY);
+			}
+        }
+    }
+
+	*out = best;
+
+    return status;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
new file mode 100644
index 0000000..30b34cf
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -0,0 +1,370 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description.c
+ *
+ *  \date       25 Jul 2014
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <uuid/uuid.h>
+
+#include "celix_errno.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "constants.h"
+
+#include "pubsub_utils.h"
+
+
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId,const char* endpoint, const char *pubsubType, properties_pt topic_props);
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher);
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp);
+
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long bundleId, long serviceId, const char* endpoint, const char *pubsubType, properties_pt topic_props) {
+
+	if (psEp->endpoint_props == NULL) {
+		psEp->endpoint_props = properties_create();
+	}
+
+	char endpointUuid[37];
+
+	uuid_t endpointUid;
+	uuid_generate(endpointUid);
+	uuid_unparse(endpointUid, endpointUuid);
+
+	properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_UUID, endpointUuid);
+
+	if (fwUUID != NULL) {
+		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, fwUUID);
+	}
+
+	if (scope != NULL) {
+		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
+	}
+
+	if (topic != NULL) {
+		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME, topic);
+	}
+
+    char idBuf[32];
+
+    if (bundleId >= 0) {
+        snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
+        properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID, idBuf);
+    }
+
+    if (serviceId >= 0) {
+        snprintf(idBuf, sizeof(idBuf), "%li", bundleId);
+        properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, idBuf);
+    }
+
+	if(endpoint != NULL) {
+		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, endpoint);
+	}
+
+    if (pubsubType != NULL) {
+        properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TYPE, pubsubType);
+    }
+
+	if(topic_props != NULL) {
+        properties_copy(topic_props, &(psEp->topic_props));
+	}
+}
+
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){
+
+	properties_pt topic_props = NULL;
+
+	bool isSystemBundle = false;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+	long bundleId = -1;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+	bundle_getBundleId(bundle,&bundleId);
+
+	if(isSystemBundle == false) {
+
+		char *bundleRoot = NULL;
+		char* topicPropertiesPath = NULL;
+		bundle_getEntry(bundle, ".", &bundleRoot);
+
+		if(bundleRoot != NULL){
+
+			asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic);
+			topic_props = properties_load(topicPropertiesPath);
+			if(topic_props==NULL){
+				printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId);
+			}
+
+			free(topicPropertiesPath);
+			free(bundleRoot);
+		}
+	}
+
+	return topic_props;
+}
+
+celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	if (ep->endpoint_props == NULL) {
+		printf("PUBSUB_EP: No endpoint_props for endpoint available!\n");
+		return CELIX_ILLEGAL_STATE;
+	}
+
+	if (key != NULL && value != NULL) {
+		properties_set(ep->endpoint_props, key, value);
+	}
+
+	return status;
+}
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long bundleId,  long serviceId, const char* endpoint, const char* pubsubType, properties_pt topic_props,pubsub_endpoint_pt* out){
+	celix_status_t status = CELIX_SUCCESS;
+
+    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
+
+	pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, serviceId, endpoint, pubsubType, topic_props);
+
+    if (!pubsubEndpoint_isEndpointValid(psEp)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = psEp;
+    } else {
+        pubsubEndpoint_destroy(psEp);
+    }
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){
+	celix_status_t status = CELIX_SUCCESS;
+
+    pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
+
+	status = properties_copy(in->endpoint_props, &(ep->endpoint_props));
+
+    if (in->topic_props != NULL) {
+        status += properties_copy(in->topic_props, &(ep->topic_props));
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = ep;
+    } else {
+        pubsubEndpoint_destroy(ep);
+    }
+
+	return status;
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(bundle_context_t *ctx, service_reference_pt reference, bool isPublisher, pubsub_endpoint_pt* out){
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
+
+	const char* fwUUID = NULL;
+	bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+
+	const char* scope = NULL;
+	serviceReference_getPropertyWithDefault(reference, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, &scope);
+
+	const char* topic = NULL;
+	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
+
+	const char* serviceId = NULL;
+	serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
+
+
+    long bundleId = -1;
+    bundle_pt bundle = NULL;
+    serviceReference_getBundle(reference, &bundle);
+    if (bundle != NULL) {
+        bundle_getBundleId(bundle, &bundleId);
+    }
+
+	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
+	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+
+    const char *pubsubType = isPublisher ? PUBSUB_PUBLISHER_ENDPOINT_TYPE : PUBSUB_SUBSCRIBER_ENDPOINT_TYPE;
+
+	pubsubEndpoint_setFields(ep, fwUUID, scope, topic, bundleId, strtol(serviceId,NULL,10), NULL, pubsubType, topic_props);
+
+    if (!pubsubEndpoint_isEndpointValid(ep)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = ep;
+    } else {
+        pubsubEndpoint_destroy(ep);
+    }
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromDiscoveredProperties(properties_t *discoveredProperties, pubsub_endpoint_pt* out) {
+    celix_status_t status = CELIX_SUCCESS;
+    
+    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
+    
+    if (psEp == NULL) {
+        return CELIX_ENOMEM;
+    }
+
+    psEp->endpoint_props = discoveredProperties;
+
+    if (!pubsubEndpoint_isEndpointValid(psEp)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = psEp;
+    } else {
+        pubsubEndpoint_destroy(psEp);
+    }
+
+    return status;
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(bundle_context_t *ctx, listener_hook_info_pt info, bool isPublisher, pubsub_endpoint_pt* out){
+	celix_status_t status = CELIX_SUCCESS;
+
+	const char* fwUUID=NULL;
+	bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+
+	if( fwUUID==NULL) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	char* topic = NULL;
+	char* scope = NULL;
+	pubsub_getPubSubInfoFromFilter(info->filter, &topic, &scope);
+
+	if (topic==NULL) {
+		free(scope);
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+	if (scope == NULL) {
+		scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+	}
+
+        pubsub_endpoint_pt psEp = calloc(1, sizeof(**out));
+
+	bundle_pt bundle = NULL;
+	long bundleId = -1;
+	bundleContext_getBundle(info->context,&bundle);
+	bundle_getBundleId(bundle,&bundleId);
+
+	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+
+	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
+	pubsubEndpoint_setFields(psEp, fwUUID, scope, topic, bundleId, -1, NULL, PUBSUB_PUBLISHER_ENDPOINT_TYPE, topic_props);
+	free(scope);
+	free(topic);
+
+    if (!pubsubEndpoint_isEndpointValid(psEp)) {
+        status = CELIX_ILLEGAL_STATE;
+    }
+
+    if (status == CELIX_SUCCESS) {
+        *out = psEp;
+    } else {
+        pubsubEndpoint_destroy(psEp);
+    }
+
+	return status;
+}
+
+void pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+    if (psEp == NULL) return;
+
+	if(psEp->topic_props != NULL){
+		properties_destroy(psEp->topic_props);
+	}
+
+	if (psEp->endpoint_props != NULL) {
+		properties_destroy(psEp->endpoint_props);
+    }
+
+	free(psEp);
+
+	return;
+
+}
+
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
+
+	if (psEp1->endpoint_props && psEp2->endpoint_props) {
+		return !strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_UUID),
+					  properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_UUID));
+	}else {
+		return false;
+	}
+}
+
+char * pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) {
+	char *result = NULL;
+	asprintf(&result, "%s:%s", scope, topic);
+
+	return result;
+}
+
+
+static bool pubsubEndpoint_isEndpointValid(pubsub_endpoint_pt psEp) {
+    //required properties
+    bool valid = true;
+    static const char* keys[] = {
+        PUBSUB_ENDPOINT_UUID,
+        PUBSUB_ENDPOINT_FRAMEWORK_UUID,
+        PUBSUB_ENDPOINT_TYPE,
+        PUBSUB_ENDPOINT_TOPIC_NAME,
+        PUBSUB_ENDPOINT_TOPIC_SCOPE,
+        NULL };
+    int i;
+    for (i = 0; keys[i] != NULL; ++i) {
+        const char *val = properties_get(psEp->endpoint_props, keys[i]);
+        if (val == NULL) { //missing required key
+            fprintf(stderr, "[ERROR] PubSubEndpoint: Invalid endpoint missing key: '%s'\n", keys[i]);
+            valid = false;
+        }
+    }
+    if (!valid) {
+        const char *key = NULL;
+        fprintf(stderr, "PubSubEndpoint entries:\n");
+        PROPERTIES_FOR_EACH(psEp->endpoint_props, key) {
+            fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->endpoint_props, key));
+        }
+        if (psEp->topic_props != NULL) {
+            fprintf(stderr, "PubSubEndpoint topic properties entries:\n");
+            PROPERTIES_FOR_EACH(psEp->topic_props, key) {
+                fprintf(stderr, "\t'%s' : '%s'\n", key, properties_get(psEp->topic_props, key));
+            }
+        }
+    }
+    return valid;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
new file mode 100644
index 0000000..55185d9
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -0,0 +1,137 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.c
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "constants.h"
+
+#include "pubsub_common.h"
+#include "pubsub/publisher.h"
+#include "pubsub_utils.h"
+
+#include "array_list.h"
+#include "bundle.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define MAX_KEYBUNDLE_LENGTH 256
+
+
+celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topicOut, char **scopeOut) {
+	celix_status_t status = CELIX_SUCCESS;
+	const char *topic = NULL;
+	const char *scope = NULL;
+	const char *objectClass = NULL;
+	celix_filter_t *filter = filter_create(filterstr);
+	if (filter != NULL) {
+		if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=exmpl))
+			array_list_t *attributes = filter->children;
+			unsigned int i;
+			unsigned int size = arrayList_size(attributes);
+			for (i = 0; i < size; ++i) {
+				filter_t *attr = arrayList_get(attributes, i);
+				if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) {
+					if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) {
+						objectClass = attr->value;
+					} else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) {
+						topic = attr->value;
+					} else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) {
+						scope = attr->value;
+					}
+				}
+			}
+		}
+	}
+
+	if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) {
+		//NOTE topic must be present, scope can be present in the filter.
+		*topicOut = strdup(topic);
+                if (scope != NULL) {
+		    *scopeOut = strdup(scope);
+		} else {
+		    *scopeOut = NULL;
+		}
+	} else {
+		*topicOut = NULL;
+		*scopeOut = NULL;
+	}
+
+	if (filter != NULL) {
+             filter_destroy(filter);
+        }
+	return status;
+}
+
+
+/**
+ * Loop through all bundles and look for the bundle with the keys inside.
+ * If no key bundle found, return NULL
+ *
+ * Caller is responsible for freeing the object
+ */
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
+{
+	array_list_pt bundles = NULL;
+	bundleContext_getBundles(ctx, &bundles);
+	int nrOfBundles = arrayList_size(bundles);
+	long bundle_id = -1;
+	char* result = NULL;
+
+	for (int i = 0; i < nrOfBundles; i++){
+		bundle_pt b = arrayList_get(bundles, i);
+
+		/* Skip bundle 0 (framework bundle) since it has no path nor revisions */
+		bundle_getBundleId(b, &bundle_id);
+		if(bundle_id==0){
+			continue;
+		}
+
+		char* dir = NULL;
+		bundle_getEntry(b, ".", &dir);
+
+		char cert_dir[MAX_KEYBUNDLE_LENGTH];
+		snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir);
+
+		struct stat s;
+		int err = stat(cert_dir, &s);
+		if (err != -1){
+			if (S_ISDIR(s.st_mode)){
+				result = dir;
+				break;
+			}
+		}
+
+		free(dir);
+	}
+
+	arrayList_destroy(bundles);
+
+	return result;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
new file mode 100644
index 0000000..9cb452b
--- /dev/null
+++ b/bundles/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(celix_pubsub_topology_manager
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_topology_manager"
+    VERSION "1.0.0"
+    SOURCES
+    	src/pstm_activator.c
+    	src/pubsub_topology_manager.c
+    	src/pubsub_topology_manager.h
+)
+target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi Celix::shell_api)
+get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
+celix_bundle_files(celix_pubsub_topology_manager
+	${DESC}
+    DESTINATION "META-INF/descriptors/services"
+)
+
+install_celix_bundle(celix_pubsub_topology_manager EXPORT celix COMPONENT pubsub)
+
+add_library(Celix::pubsub_topology_manager ALIAS celix_pubsub_topology_manager)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
new file mode 100644
index 0000000..eb874fe
--- /dev/null
+++ b/bundles/pubsub/pubsub_topology_manager/src/pstm_activator.c
@@ -0,0 +1,244 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pstm_activator.c
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "constants.h"
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+
+#include "listener_hook_service.h"
+#include "log_service.h"
+#include "log_helper.h"
+
+
+#include "pubsub_topology_manager.h"
+#include "publisher_endpoint_announce.h"
+
+struct activator {
+	bundle_context_pt context;
+
+	pubsub_topology_manager_pt manager;
+
+	service_tracker_pt pubsubDiscoveryTracker;
+	service_tracker_pt pubsubAdminTracker;
+	service_tracker_pt pubsubSubscribersTracker;
+
+	listener_hook_service_pt hookService;
+	service_registration_pt hook;
+
+	publisher_endpoint_announce_pt publisherEPDiscover;
+	service_registration_pt publisherEPDiscoverService;
+
+	log_helper_pt loghelper;
+};
+
+
+static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker);
+static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker);
+static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker);
+
+
+static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			NULL,
+			pubsub_topologyManager_pubsubDiscoveryAdded,
+			pubsub_topologyManager_pubsubDiscoveryModified,
+			pubsub_topologyManager_pubsubDiscoveryRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, (char *) PUBSUB_DISCOVERY_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
+static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			NULL,
+			pubsub_topologyManager_psaAdded,
+			pubsub_topologyManager_psaModified,
+			pubsub_topologyManager_psaRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, PUBSUB_ADMIN_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
+static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			NULL,
+			pubsub_topologyManager_subscriberAdded,
+			pubsub_topologyManager_subscriberModified,
+			pubsub_topologyManager_subscriberRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, PUBSUB_SUBSCRIBER_SERVICE_NAME, customizer, tracker);
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = NULL;
+
+	activator = calloc(1,sizeof(struct activator));
+
+	if (!activator) {
+		return CELIX_ENOMEM;
+	}
+
+	activator->context = context;
+
+	logHelper_create(context, &activator->loghelper);
+	logHelper_start(activator->loghelper);
+
+	status = pubsub_topologyManager_create(context, activator->loghelper, &activator->manager);
+	if (status == CELIX_SUCCESS) {
+		status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker);
+		if (status == CELIX_SUCCESS) {
+			status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
+			if (status == CELIX_SUCCESS) {
+				status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
+				if (status == CELIX_SUCCESS) {
+					*userData = activator;
+				}
+			}
+		}
+	}
+
+	if(status != CELIX_SUCCESS){
+		bundleActivator_destroy(activator, context);
+	}
+
+	return status;
+}
+
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	publisher_endpoint_announce_pt pubEPDiscover = calloc(1, sizeof(*pubEPDiscover));
+	pubEPDiscover->handle = activator->manager;
+	pubEPDiscover->announcePublisher = pubsub_topologyManager_announcePublisher;
+	pubEPDiscover->removePublisher = pubsub_topologyManager_removePublisher;
+	activator->publisherEPDiscover = pubEPDiscover;
+
+	status += bundleContext_registerService(context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, pubEPDiscover, NULL, &activator->publisherEPDiscoverService);
+
+
+	listener_hook_service_pt hookService = calloc(1,sizeof(*hookService));
+	hookService->handle = activator->manager;
+	hookService->added = pubsub_topologyManager_publisherTrackerAdded;
+	hookService->removed = pubsub_topologyManager_publisherTrackerRemoved;
+	activator->hookService = hookService;
+
+	status += bundleContext_registerService(context, (char *) OSGI_FRAMEWORK_LISTENER_HOOK_SERVICE_NAME, hookService, NULL, &activator->hook);
+
+	/* NOTE: Enable those line in order to remotely expose the topic_info service
+	properties_pt props = properties_create();
+	properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE);
+	status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
+	*/
+	status += serviceTracker_open(activator->pubsubAdminTracker);
+
+	status += serviceTracker_open(activator->pubsubDiscoveryTracker);
+
+	status += serviceTracker_open(activator->pubsubSubscribersTracker);
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceTracker_close(activator->pubsubSubscribersTracker);
+	serviceTracker_close(activator->pubsubDiscoveryTracker);
+	serviceTracker_close(activator->pubsubAdminTracker);
+
+	serviceRegistration_unregister(activator->publisherEPDiscoverService);
+	free(activator->publisherEPDiscover);
+
+	serviceRegistration_unregister(activator->hook);
+	free(activator->hookService);
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	struct activator *activator = userData;
+	if (activator == NULL) {
+		status = CELIX_BUNDLE_EXCEPTION;
+	} else {
+
+		if(activator->pubsubSubscribersTracker!=NULL){
+			serviceTracker_destroy(activator->pubsubSubscribersTracker);
+		}
+		if(activator->pubsubDiscoveryTracker!=NULL){
+			serviceTracker_destroy(activator->pubsubDiscoveryTracker);
+		}
+		if(activator->pubsubAdminTracker!=NULL){
+			serviceTracker_destroy(activator->pubsubAdminTracker);
+		}
+
+		if(activator->manager!=NULL){
+			status = pubsub_topologyManager_destroy(activator->manager);
+		}
+
+		logHelper_stop(activator->loghelper);
+		logHelper_destroy(&activator->loghelper);
+
+		free(activator);
+	}
+
+	return status;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
new file mode 100644
index 0000000..5b983d4
--- /dev/null
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -0,0 +1,784 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_topology_manager.c
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "constants.h"
+#include "listener_hook_service.h"
+#include "utils.h"
+#include "log_service.h"
+#include "log_helper.h"
+
+#include "publisher_endpoint_announce.h"
+#include "pubsub_topology_manager.h"
+#include "pubsub_admin.h"
+
+static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
+	for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) {
+		const char* key = (const char*)hashMapIterator_nextKey(&iter);
+		fprintf(outStream, "    Topic=%s\n", key);
+		array_list_pt ep_list = hashMap_get(endpoints, key);
+		for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
+			pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
+			fprintf(outStream, "        Endpoint %d\n", i);
+			fprintf(outStream, "            Endpoint properties\n");
+			const char *propKey;
+			if(ep->endpoint_props) {
+				PROPERTIES_FOR_EACH(ep->endpoint_props, propKey) {
+					fprintf(outStream, "                %s => %s\n", propKey, properties_get(ep->endpoint_props, propKey));
+				}
+			}
+			if(ep->topic_props) {
+				fprintf(outStream, "            Topic properties\n");
+				PROPERTIES_FOR_EACH(ep->topic_props, propKey) {
+					fprintf(outStream, "                %s => %s\n", propKey, properties_get(ep->topic_props, propKey));
+				}
+			}
+		}
+	}
+
+}
+
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) {
+	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt) handle;
+	if (manager->publications && !hashMap_isEmpty(manager->publications)) {
+		fprintf(outStream, "Publications:\n");
+		print_endpoint_info(manager->publications, outStream);
+	}
+	if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) {
+		fprintf(outStream, "Subscriptions:\n");
+		print_endpoint_info(manager->subscriptions, outStream);
+	}
+	return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*manager = calloc(1, sizeof(**manager));
+	if (!*manager) {
+		return CELIX_ENOMEM;
+	}
+
+	(*manager)->context = context;
+
+	celix_thread_mutexattr_t psaAttr;
+	celixThreadMutexAttr_create(&psaAttr);
+	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+	status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+	celixThreadMutexAttr_destroy(&psaAttr);
+
+	status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+	status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
+	status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+
+	arrayList_create(&(*manager)->psaList);
+
+	(*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+	(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+	(*manager)->loghelper = logHelper;
+	(*manager)->shellCmdService.handle = *manager;
+	(*manager)->shellCmdService.executeCommand = shellCommand;
+
+	(*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE;
+	const char *verboseStr = NULL;
+	bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, &verboseStr);
+	if (verboseStr != NULL) {
+		(*manager)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+	}
+
+	properties_pt shellProps = properties_create();
+	properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
+	properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
+	properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub");
+	bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&manager->discoveryListLock);
+	hashMap_destroy(manager->discoveryList, false, false);
+	celixThreadMutex_unlock(&manager->discoveryListLock);
+	celixThreadMutex_destroy(&manager->discoveryListLock);
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_destroy(manager->psaList);
+	celixThreadMutex_unlock(&manager->psaListLock);
+	celixThreadMutex_destroy(&manager->psaListLock);
+
+	celixThreadMutex_lock(&manager->publicationsLock);
+	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
+	while(hashMapIterator_hasNext(pubit)){
+		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
+		unsigned int i;
+		for(i=0;i<arrayList_size(l);i++){
+			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
+		}
+		arrayList_destroy(l);
+	}
+	hashMapIterator_destroy(pubit);
+	hashMap_destroy(manager->publications, true, false);
+	celixThreadMutex_unlock(&manager->publicationsLock);
+	celixThreadMutex_destroy(&manager->publicationsLock);
+
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
+	while(hashMapIterator_hasNext(subit)){
+		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
+		unsigned int i;
+		for(i=0;i<arrayList_size(l);i++){
+			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
+		}
+		arrayList_destroy(l);
+	}
+	hashMapIterator_destroy(subit);
+	hashMap_destroy(manager->subscriptions, true, false);
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+	celixThreadMutex_destroy(&manager->subscriptionsLock);
+	serviceRegistration_unregister(manager->shellCmdReg);
+	free(manager);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+	unsigned int i;
+
+	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_add(manager->psaList, psa);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+	// Add already detected subscriptions to new PSA
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
+
+	//TODO FIXME no matching used, should only add unmatched subscribers ?
+	//NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
+	while (hashMapIterator_hasNext(subscriptionsIterator)) {
+		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
+		for(i=0;i<arrayList_size(sub_ep_list);i++){
+			status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
+		}
+	}
+
+	hashMapIterator_destroy(subscriptionsIterator);
+
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+	// Add already detected publications to new PSA
+	status = celixThreadMutex_lock(&manager->publicationsLock);
+	hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
+
+	//TODO FIXME no matching used, should only add unmatched publications ?
+	//NOTE this is a bug which occurs when psa are started after bundles that uses the PSA
+	while (hashMapIterator_hasNext(publicationsIterator)) {
+		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
+		for(i=0;i<arrayList_size(pub_ep_list);i++){
+			status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
+		}
+	}
+
+	hashMapIterator_destroy(publicationsIterator);
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	// Nop...
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+
+	/* Deactivate all publications */
+	celixThreadMutex_lock(&manager->publicationsLock);
+
+	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
+	while(hashMapIterator_hasNext(pubit)){
+		hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
+		char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
+		// Extract scope/topic name from key
+		char scope[MAX_SCOPE_LEN];
+		char topic[MAX_TOPIC_LEN];
+		sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
+		array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry);
+
+		status = psa->closeAllPublications(psa->admin,scope,topic);
+
+		if(status==CELIX_SUCCESS){
+			celixThreadMutex_lock(&manager->discoveryListLock);
+			hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+			while(hashMapIterator_hasNext(iter)){
+				service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+				publisher_endpoint_announce_pt disc = NULL;
+				bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+				const char* fwUUID = NULL;
+				bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+				unsigned int i;
+				for(i=0;i<arrayList_size(pubEP_list);i++){
+					pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+					if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+						disc->removePublisher(disc->handle,pubEP);
+					}
+				}
+				bundleContext_ungetService(manager->context, disc_sr, NULL);
+			}
+			hashMapIterator_destroy(iter);
+			celixThreadMutex_unlock(&manager->discoveryListLock);
+		}
+	}
+	hashMapIterator_destroy(pubit);
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	/* Deactivate all subscriptions */
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
+	while(hashMapIterator_hasNext(subit)){
+		// TODO do some error checking
+		char* scope_topic = (char*)hashMapIterator_nextKey(subit);
+		char scope[MAX_TOPIC_LEN];
+		char topic[MAX_TOPIC_LEN];
+		memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
+		memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
+		sscanf(scope_topic, "%[^:]:%s", scope, topic );
+		status += psa->closeAllSubscriptions(psa->admin,scope, topic);
+	}
+	hashMapIterator_destroy(subit);
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_removeElement(manager->psaList, psa);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+	//subscriber_service_pt subscriber = (subscriber_service_pt)service;
+
+	pubsub_endpoint_pt sub = NULL;
+	if(pubsubEndpoint_createFromServiceReference(manager->context, reference,false, &sub) == CELIX_SUCCESS){
+		celixThreadMutex_lock(&manager->subscriptionsLock);
+		char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
+		if(sub_list_by_topic==NULL){
+			arrayList_create(&sub_list_by_topic);
+			hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
+		}
+		free(sub_key);
+		arrayList_add(sub_list_by_topic,sub);
+
+		celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+		unsigned int j;
+		double score = 0;
+		double best_score = 0;
+		pubsub_admin_service_pt best_psa = NULL;
+		celixThreadMutex_lock(&manager->psaListLock);
+		for(j=0;j<arrayList_size(manager->psaList);j++){
+			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+			psa->matchEndpoint(psa->admin,sub,&score);
+			if (score > best_score) { /* We have a new winner! */
+				best_score = score;
+				best_psa = psa;
+			}
+		}
+
+		if (best_psa != NULL && best_score>0) {
+			best_psa->addSubscription(best_psa->admin,sub);
+		}
+
+		// Inform discoveries for interest in the topic
+		celixThreadMutex_lock(&manager->discoveryListLock);
+		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+		while(hashMapIterator_hasNext(iter)){
+			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+			publisher_endpoint_announce_pt disc = NULL;
+			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+			disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+			bundleContext_ungetService(manager->context, disc_sr, NULL);
+		}
+		hashMapIterator_destroy(iter);
+		celixThreadMutex_unlock(&manager->discoveryListLock);
+
+		celixThreadMutex_unlock(&manager->psaListLock);
+	}
+	else{
+		status=CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	// Nop...
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	pubsub_endpoint_pt subcmp = NULL;
+	if(pubsubEndpoint_createFromServiceReference(manager->context, reference, false, &subcmp) == CELIX_SUCCESS){
+
+		unsigned int j,k;
+
+		// Inform discoveries that we not interested in the topic any more
+		celixThreadMutex_lock(&manager->discoveryListLock);
+		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+		while(hashMapIterator_hasNext(iter)){
+			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+			publisher_endpoint_announce_pt disc = NULL;
+			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+			disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+			bundleContext_ungetService(manager->context, disc_sr, NULL);
+		}
+		hashMapIterator_destroy(iter);
+		celixThreadMutex_unlock(&manager->discoveryListLock);
+
+		celixThreadMutex_lock(&manager->subscriptionsLock);
+		celixThreadMutex_lock(&manager->psaListLock);
+
+		char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
+		free(sub_key);
+		if(sub_list_by_topic!=NULL){
+			for(j=0;j<arrayList_size(sub_list_by_topic);j++){
+				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
+				if(pubsubEndpoint_equals(sub,subcmp)){
+					for(k=0;k<arrayList_size(manager->psaList);k++){
+						/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
+						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+						psa->removeSubscription(psa->admin,sub);
+					}
+
+				}
+				arrayList_remove(sub_list_by_topic,j);
+
+				/* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
+				if(arrayList_size(sub_list_by_topic)==0){
+					for(k=0;k<arrayList_size(manager->psaList);k++){
+						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+						psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+					}
+				}
+
+				pubsubEndpoint_destroy(sub);
+
+			}
+		}
+
+		celixThreadMutex_unlock(&manager->psaListLock);
+		celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+		pubsubEndpoint_destroy(subcmp);
+
+	}
+	else{
+		status=CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+
+	return status;
+
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
+	publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service;
+
+	const char* fwUUID = NULL;
+
+	bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+	if(fwUUID==NULL){
+		printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
+		return CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+
+	celixThreadMutex_lock(&manager->publicationsLock);
+
+	celixThreadMutex_lock(&manager->discoveryListLock);
+	hashMap_put(manager->discoveryList, reference, NULL);
+	celixThreadMutex_unlock(&manager->discoveryListLock);
+
+	hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
+	while(hashMapIterator_hasNext(iter)){
+		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
+		for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
+			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+			if( (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
+				status += disc->announcePublisher(disc->handle,pubEP);
+			}
+		}
+	}
+	hashMapIterator_destroy(iter);
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	iter = hashMapIterator_create(manager->subscriptions);
+
+	while(hashMapIterator_hasNext(iter)) {
+		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
+		unsigned int i;
+		for(i=0;i<arrayList_size(l);i++){
+			pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
+
+			disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		}
+	}
+	hashMapIterator_destroy(iter);
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
+	if (status == CELIX_SUCCESS) {
+		status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_topology_manager_pt manager = handle;
+
+	celixThreadMutex_lock(&manager->discoveryListLock);
+
+
+	if (hashMap_remove(manager->discoveryList, reference)) {
+		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
+	}
+
+	celixThreadMutex_unlock(&manager->discoveryListLock);
+
+	return status;
+}
+
+
+celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) {
+
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	unsigned int l_index;
+
+	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
+
+		listener_hook_info_pt info = arrayList_get(listeners, l_index);
+
+		pubsub_endpoint_pt pub = NULL;
+		if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub) == CELIX_SUCCESS){
+
+			celixThreadMutex_lock(&manager->publicationsLock);
+			char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+			array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
+			if(pub_list_by_topic==NULL){
+				arrayList_create(&pub_list_by_topic);
+				hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+			}
+			free(pub_key);
+			arrayList_add(pub_list_by_topic,pub);
+
+			celixThreadMutex_unlock(&manager->publicationsLock);
+
+			unsigned int j;
+			double score = 0;
+			double best_score = 0;
+			pubsub_admin_service_pt best_psa = NULL;
+			celixThreadMutex_lock(&manager->psaListLock);
+
+			for(j=0;j<arrayList_size(manager->psaList);j++){
+				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+				psa->matchEndpoint(psa->admin,pub,&score);
+				if(score>best_score){ /* We have a new winner! */
+					best_score = score;
+					best_psa = psa;
+				}
+			}
+
+			if (best_psa != NULL && best_score > 0) {
+				status = best_psa->addPublication(best_psa->admin,pub);
+				if(status==CELIX_SUCCESS){
+					celixThreadMutex_lock(&manager->discoveryListLock);
+					hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+					while(hashMapIterator_hasNext(iter)){
+						service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+						publisher_endpoint_announce_pt disc = NULL;
+						bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+						disc->announcePublisher(disc->handle,pub);
+						bundleContext_ungetService(manager->context, disc_sr, NULL);
+					}
+					hashMapIterator_destroy(iter);
+					celixThreadMutex_unlock(&manager->discoveryListLock);
+				}
+			}
+
+			celixThreadMutex_unlock(&manager->psaListLock);
+
+		}
+
+	}
+
+	return status;
+
+}
+
+
+celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	unsigned int l_index;
+
+	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
+
+		listener_hook_info_pt info = arrayList_get(listeners, l_index);
+
+		pubsub_endpoint_pt pubcmp = NULL;
+		if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){
+
+
+			unsigned int j,k;
+			celixThreadMutex_lock(&manager->psaListLock);
+			celixThreadMutex_lock(&manager->publicationsLock);
+
+			char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+			array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+			if(pub_list_by_topic!=NULL){
+				for(j=0;j<arrayList_size(pub_list_by_topic);j++){
+					pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
+					if(pubsubEndpoint_equals(pub,pubcmp)){
+						for(k=0;k<arrayList_size(manager->psaList);k++){
+							pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+							status = psa->removePublication(psa->admin,pub);
+							if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
+								celixThreadMutex_lock(&manager->discoveryListLock);
+								hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+								while(hashMapIterator_hasNext(iter)){
+									service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+									publisher_endpoint_announce_pt disc = NULL;
+									bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+									disc->removePublisher(disc->handle,pub);
+									bundleContext_ungetService(manager->context, disc_sr, NULL);
+								}
+								hashMapIterator_destroy(iter);
+								celixThreadMutex_unlock(&manager->discoveryListLock);
+							}
+							else if(status ==  CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
+								status = CELIX_SUCCESS;
+							}
+						}
+						//}
+						arrayList_remove(pub_list_by_topic,j);
+
+						/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
+						if(arrayList_size(pub_list_by_topic)==0){
+							for(k=0;k<arrayList_size(manager->psaList);k++){
+								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+								psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+							}
+						}
+
+						pubsubEndpoint_destroy(pub);
+					}
+
+				}
+			}
+
+			celixThreadMutex_unlock(&manager->publicationsLock);
+			celixThreadMutex_unlock(&manager->psaListLock);
+
+			free(pub_key);
+
+			pubsubEndpoint_destroy(pubcmp);
+
+		}
+
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){
+	celix_status_t status = CELIX_SUCCESS;
+    pubsub_topology_manager_pt manager = handle;
+
+    if (manager->verbose) {
+        printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	celixThreadMutex_lock(&manager->publicationsLock);
+
+	char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+	if(pub_list_by_topic==NULL){
+		arrayList_create(&pub_list_by_topic);
+		hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+	}
+	free(pub_key);
+
+	/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
+	pubsub_endpoint_pt p = NULL;
+	pubsubEndpoint_clone(pubEP, &p);
+	arrayList_add(pub_list_by_topic , p);
+
+	unsigned int j;
+	double score = 0;
+	double best_score = 0;
+	pubsub_admin_service_pt best_psa = NULL;
+
+	for(j=0;j<arrayList_size(manager->psaList);j++){
+		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+		psa->matchEndpoint(psa->admin , p, &score);
+		if (score>best_score) { /* We have a new winner! */
+			best_score = score;
+			best_psa = psa;
+		}
+	}
+
+	if(best_psa != NULL && best_score>0) {
+        //TODO FIXME this the same call as used by publisher of service trackers. This is confusing.
+        //remote discovered publication can be handle different.
+		best_psa->addPublication(best_psa->admin,p);
+	}
+	else{
+		status = CELIX_ILLEGAL_STATE;
+	}
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){
+	celix_status_t status = CELIX_SUCCESS;
+    pubsub_topology_manager_pt manager = handle;
+
+    if (manager->verbose) {
+        printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID));
+    }
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	celixThreadMutex_lock(&manager->publicationsLock);
+	unsigned int i;
+
+	char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+	if(pub_list_by_topic==NULL){
+		printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+		status = CELIX_ILLEGAL_STATE;
+	}
+	else{
+
+		pubsub_endpoint_pt p = NULL;
+		bool found = false;
+
+		for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
+			p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
+			found = pubsubEndpoint_equals(p,pubEP);
+		}
+
+		if(found && p !=NULL){
+
+			for(i=0;i<arrayList_size(manager->psaList);i++){
+				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
+				/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
+				psa->removePublication(psa->admin,p);
+			}
+
+			arrayList_removeElement(pub_list_by_topic,p);
+
+			/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */
+			if(arrayList_size(pub_list_by_topic)==0){
+
+				for(i=0;i<arrayList_size(manager->psaList);i++){
+					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
+					psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+				}
+			}
+
+			pubsubEndpoint_destroy(p);
+		}
+
+
+	}
+	free(pub_key);
+	celixThreadMutex_unlock(&manager->publicationsLock);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+
+	return status;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
new file mode 100644
index 0000000..769048d
--- /dev/null
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -0,0 +1,92 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_topology_manager.h
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_TOPOLOGY_MANAGER_H_
+#define PUBSUB_TOPOLOGY_MANAGER_H_
+
+#include "service_reference.h"
+#include "bundle_context.h"
+#include "log_helper.h"
+#include "command.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "pubsub/publisher.h"
+#include "pubsub/subscriber.h"
+
+	#define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY 		"PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
+#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE		false
+
+
+struct pubsub_topology_manager {
+	bundle_context_pt context;
+
+	celix_thread_mutex_t psaListLock;
+	array_list_pt psaList;
+
+	celix_thread_mutex_t discoveryListLock;
+	hash_map_pt discoveryList; //<serviceReference,NULL>
+
+	celix_thread_mutex_t publicationsLock;
+	hash_map_pt publications; //<topic(string),list<pubsub_ep>>
+
+	celix_thread_mutex_t subscriptionsLock;
+	hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
+
+	command_service_t shellCmdService;
+	service_registration_pt  shellCmdReg;
+
+
+	log_helper_pt loghelper;
+
+	bool verbose;
+};
+
+typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
+
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager);
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager);
+celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager);
+
+celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service);
+celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service);
+celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service);
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service);
+celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service);
+celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service);
+
+celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners);
+celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners);
+
+celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
+
+#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
new file mode 100644
index 0000000..39e5575
--- /dev/null
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_program(ETCD_CMD NAMES etcd)
+
+find_package(CppUTest REQUIRED)
+include_directories(${CPPUTEST_INCLUDE_DIR})
+
+add_celix_bundle(pubsub_sut
+    #"Vanilla" bundle which is under test
+    SOURCES
+        test/sut_activator.c
+    VERSION 1.0.0
+)
+target_include_directories(pubsub_sut PRIVATE test)
+target_link_libraries(pubsub_sut PRIVATE Celix::pubsub_spi)
+
+celix_bundle_files(pubsub_sut
+    msg_descriptors/msg.descriptor
+    msg_descriptors/sync.descriptor
+    DESTINATION "META-INF/descriptors/messages"
+)
+
+add_celix_container(pubsub_udpmc_sut
+    NAME deploy_sut
+    BUNDLES
+        celix_pubsub_serializer_json
+        celix_pubsub_discovery_etcd
+        celix_pubsub_admin_udp_multicast
+        celix_pubsub_topology_manager
+        pubsub_sut
+    DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
+)
+add_celix_container(pubsub_zmq_sut
+    NAME deploy_sut
+    BUNDLES
+        celix_pubsub_serializer_json
+        celix_pubsub_discovery_etcd
+        celix_pubsub_admin_zmq
+        celix_pubsub_topology_manager
+        pubsub_sut
+    DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/zmq
+)
+
+add_celix_bundle(pubsub_tst
+    #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
+    SOURCES
+        test/tst_activator.cpp
+    VERSION 1.0.0
+)
+if (APPLE)
+    #Note that the launcher celix_test_runner is linked with CppuTest, not the bundle libs. Default libCppUTest.a is not compiled for relocation 
+    target_link_libraries(pubsub_tst PRIVATE Celix::framework -Wl,-undefined -Wl,dynamic_lookup)
+else ()
+    target_link_libraries(pubsub_tst PRIVATE Celix::framework)
+endif ()
+
+celix_bundle_files(pubsub_tst
+    msg_descriptors/msg.descriptor
+    msg_descriptors/sync.descriptor
+    DESTINATION "META-INF/descriptors/messages"
+)
+add_celix_container(pubsub_udpmc_tst
+    NAME deploy_tst
+    BUNDLES
+        celix_pubsub_serializer_json
+        celix_pubsub_topology_manager
+        celix_pubsub_discovery_etcd
+        celix_pubsub_admin_udp_multicast
+        pubsub_tst
+    #NOTE using $<TARGET_PROPERTY:pubsub_test_udpmc_runtime,RUNTIME_DIR> in DIR not (yet) possible
+    DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
+    LAUNCHER celix_test_runner
+)
+add_celix_container(pubsub_zmq_tst
+    NAME deploy_tst
+    BUNDLES
+        celix_pubsub_serializer_json
+        celix_pubsub_topology_manager
+        celix_pubsub_discovery_etcd
+        celix_pubsub_admin_zmq
+        pubsub_tst
+    DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/zmq
+    LAUNCHER celix_test_runner
+)
+
+if (ETCD_CMD)
+    add_runtime(pubsub_test_udpmc_runtime
+        NAME udpmc
+        GROUP test/pubsub
+        DEPLOYMENTS
+            pubsub_udpmc_sut
+            pubsub_udpmc_tst
+        COMMANDS
+            etcd
+        WAIT_FOR
+            pubsub_udpmc_tst
+        LOG_TO_FILES
+        #USE_TERM
+    )
+    add_test(NAME pubsub_udpmc_test
+	    COMMAND $<TARGET_PROPERTY:pubsub_test_udpmc_runtime,RUNTIME_LOC>/start.sh
+    )
+
+    add_runtime(pubsub_test_zmq_runtime
+        NAME zmq
+        GROUP test/pubsub
+        DEPLOYMENTS
+            pubsub_zmq_sut
+            pubsub_zmq_tst
+        COMMANDS
+            etcd
+        ARGUMENTS
+            pubsub_zmq_tst "-o junit"
+        WAIT_FOR
+            pubsub_zmq_tst
+        LOG_TO_FILES
+        #USE_TERM
+    )
+    add_test(NAME pubsub_zmq_test
+	    COMMAND $<TARGET_PROPERTY:pubsub_test_zmq_runtime,RUNTIME_LOC>/start.sh
+    )
+endif ()

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/msg_descriptors/msg.descriptor b/bundles/pubsub/test/msg_descriptors/msg.descriptor
new file mode 100644
index 0000000..0eb28cb
--- /dev/null
+++ b/bundles/pubsub/test/msg_descriptors/msg.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=msg
+version=1.0.0
+:annotations
+classname=org.example.Msg
+:types
+:message
+{i seqnR}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/test/msg_descriptors/sync.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/msg_descriptors/sync.descriptor b/bundles/pubsub/test/msg_descriptors/sync.descriptor
new file mode 100644
index 0000000..529ba71
--- /dev/null
+++ b/bundles/pubsub/test/msg_descriptors/sync.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=sync
+version=1.0.0
+:annotations
+classname=org.example.Sync
+:types
+:message
+{F nop}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/test/test/msg.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/msg.h b/bundles/pubsub/test/test/msg.h
new file mode 100644
index 0000000..952e921
--- /dev/null
+++ b/bundles/pubsub/test/test/msg.h
@@ -0,0 +1,31 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef MSG_H
+#define MSG_H
+
+#include <stdint.h>
+
+#define MSG_NAME "msg"
+
+typedef struct msg {
+    uint32_t seqNr;
+} msg_t;
+
+#endif //MSG_H

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/test/test/sut_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/sut_activator.c b/bundles/pubsub/test/test/sut_activator.c
new file mode 100644
index 0000000..717b540
--- /dev/null
+++ b/bundles/pubsub/test/test/sut_activator.c
@@ -0,0 +1,115 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <constants.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+
+#include "pubsub/subscriber.h"
+#include "pubsub/publisher.h"
+
+static int sut_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+static int sut_pubAdded(void *handle, service_reference_pt reference, void *service);
+static int sut_pubRemoved(void *handle, service_reference_pt reference, void *service);
+
+
+struct activator {
+	pubsub_subscriber_t subSvc;
+	service_registration_pt reg;
+
+	service_tracker_pt tracker;
+
+	pthread_mutex_t mutex;
+	pubsub_publisher_t* pubSvc;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	struct activator* act = malloc(sizeof(*act));
+	*userData = act;
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	struct activator* act = (struct activator*) userData;
+
+	properties_pt props = properties_create();
+	properties_set(props, "pubsub.topic", "ping");
+	act->subSvc.handle = act;
+	act->subSvc.receive = sut_receive;
+	act->reg = NULL;
+	bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &act->subSvc, props, &act->reg);
+
+	char filter[512];
+	snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "pong");
+
+	service_tracker_customizer_pt customizer = NULL;
+	serviceTrackerCustomizer_create(act, NULL, sut_pubAdded, NULL, sut_pubRemoved, &customizer);
+	serviceTracker_createWithFilter(context, filter, customizer, &act->tracker);
+	serviceTracker_open(act->tracker);
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
+	struct activator* act = userData;
+	serviceTracker_close(act->tracker);
+	return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt  __attribute__((unused)) context) {
+	struct activator* act = userData;
+	serviceTracker_destroy(act->tracker);
+	return CELIX_SUCCESS;
+}
+
+static int sut_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release) {
+	struct activator* act = handle;
+	printf("Received msg '%s', sending back\n", msgType);
+	pthread_mutex_lock(&act->mutex);
+	if (act->pubSvc != NULL) {
+		unsigned int sendId = 0;
+		act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, msgType, &sendId);
+		act->pubSvc->send(act->pubSvc->handle, sendId, msg);
+	}
+	pthread_mutex_unlock(&act->mutex);
+	return CELIX_SUCCESS;
+}
+
+static int sut_pubAdded(void *handle, service_reference_pt reference, void *service) {
+	struct activator* act = handle;
+	pthread_mutex_lock(&act->mutex);
+	act->pubSvc = service;
+	pthread_mutex_unlock(&act->mutex);
+	return CELIX_SUCCESS;
+
+}
+
+static int sut_pubRemoved(void *handle, service_reference_pt reference, void *service) {
+	struct activator* act = handle;
+	pthread_mutex_lock(&act->mutex);
+	if (act->pubSvc == service) {
+		act->pubSvc = NULL;
+	}
+	pthread_mutex_unlock(&act->mutex);
+	return CELIX_SUCCESS;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/test/test/sync.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/test/test/sync.h b/bundles/pubsub/test/test/sync.h
new file mode 100644
index 0000000..0f047ca
--- /dev/null
+++ b/bundles/pubsub/test/test/sync.h
@@ -0,0 +1,29 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef SYNC_H
+#define SYNC_H
+
+#define SYNC_NAME "sync"
+
+typedef struct sync {
+    float nop; //need at least one entry
+} sync_t;
+
+#endif //SYNC_H


Mime
View raw message