celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [38/51] [partial] celix git commit: CELIX-424: Cleans up the directory structure. Moves all libraries to the libs subdir and all bundles to the bundles subdir
Date Sun, 27 May 2018 18:37:21 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
new file mode 100644
index 0000000..c788382
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
@@ -0,0 +1,113 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.h
+ *
+ *  \date       Dec 5, 2013
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_ZMQ_IMPL_H_
+#define PUBSUB_ADMIN_ZMQ_IMPL_H_
+
+#include <czmq.h>
+/* The following undefs prevent the collision between:
+ * - sys/syslog.h (which is included within czmq)
+ * - celix/dfi/dfi_log_util.h
+ */
+#undef LOG_DEBUG
+#undef LOG_WARNING
+#undef LOG_INFO
+#undef LOG_WARNING
+
+#include "pubsub_psa_zmq_constants.h"
+#include "pubsub_admin.h"
+#include "pubsub_admin_match.h"
+#include "log_helper.h"
+#include "command.h"
+
+
+struct pubsub_admin {
+
+	bundle_context_pt bundle_context;
+	log_helper_pt loghelper;
+
+	/* List of the available serializers */
+	celix_thread_mutex_t serializerListLock;
+	array_list_pt serializerList; // List<serializers service references>
+
+	celix_thread_mutex_t localPublicationsLock;
+	hash_map_pt localPublications;//<topic(string),service_factory_pt>
+
+	celix_thread_mutex_t externalPublicationsLock;
+	hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
+
+	celix_thread_mutex_t subscriptionsLock;
+	hash_map_pt subscriptions; //<topic(string),topic_subscription>
+
+	celix_thread_mutex_t pendingSubscriptionsLock;
+	celix_thread_mutexattr_t pendingSubscriptionsAttr;
+	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
+
+	/* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
+	celix_thread_mutex_t noSerializerPendingsLock;
+	celix_thread_mutexattr_t noSerializerPendingsAttr;
+	array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+	array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+	celix_thread_mutex_t usedSerializersLock;
+	hash_map_pt topicSubscriptionsPerSerializer; // <serializer,List<topicSubscription>>
+	hash_map_pt topicPublicationsPerSerializer; // <serializer,List<topicPublications>>
+
+	char* ipAddress;
+
+	zactor_t* zmq_auth;
+
+    unsigned int basePort;
+    unsigned int maxPort;
+
+	command_service_t shellCmdService;
+	service_registration_pt  shellCmdReg;
+
+	double qosSampleScore;
+	double qosControlScore;
+	double defaultScore;
+
+	bool verbose;
+};
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic);
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+
+#endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
new file mode 100644
index 0000000..211439e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -0,0 +1,48 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+
+#ifndef PUBSUB_PSA_ZMQ_CONSTANTS_H_
+#define PUBSUB_PSA_ZMQ_CONSTANTS_H_
+
+
+
+#define PSA_ZMQ_PUBSUB_ADMIN_TYPE	            "zmq"
+
+#define PSA_ZMQ_BASE_PORT                       "PSA_ZMQ_BASE_PORT"
+#define PSA_ZMQ_MAX_PORT                        "PSA_ZMQ_MAX_PORT"
+
+#define PSA_ZMQ_DEFAULT_BASE_PORT               5501
+#define PSA_ZMQ_DEFAULT_MAX_PORT                6000
+
+#define PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE 	    30
+#define PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE 	    70
+#define PSA_ZMQ_DEFAULT_SCORE 				    30
+
+#define PSA_ZMQ_QOS_SAMPLE_SCORE_KEY 		    "PSA_ZMQ_QOS_SAMPLE_SCORE"
+#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY 		    "PSA_ZMQ_QOS_CONTROL_SCORE"
+#define PSA_ZMQ_DEFAULT_SCORE_KEY 			    "PSA_ZMQ_DEFAULT_SCORE"
+
+#define PSA_ZMQ_DEFAULT_VERBOSE 			    false
+#define PSA_ZMQ_VERBOSE_KEY		 			    "PSA_ZMQ_VERBOSE"
+
+
+
+#endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
new file mode 100644
index 0000000..c81107d
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@ -0,0 +1,640 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <czmq.h>
+/* The following undefs prevent the collision between:
+ * - sys/syslog.h (which is included within czmq)
+ * - celix/dfi/dfi_log_util.h
+ */
+#undef LOG_DEBUG
+#undef LOG_WARNING
+#undef LOG_INFO
+#undef LOG_WARNING
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_registration.h"
+#include "utils.h"
+#include "service_factory.h"
+#include "version.h"
+
+#include "pubsub_common.h"
+#include "pubsub_utils.h"
+#include "pubsub/publisher.h"
+
+#include "topic_publication.h"
+
+#include "pubsub_serializer.h"
+#include "pubsub_psa_zmq_constants.h"
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	#include "zmq_crypto.h"
+
+	#define MAX_CERT_PATH_LENGTH 512
+#endif
+
+#define EP_ADDRESS_LEN		32
+#define ZMQ_BIND_MAX_RETRY	5
+
+#define FIRST_SEND_DELAY	2
+
+struct topic_publication {
+	zsock_t* zmq_socket;
+	celix_thread_mutex_t socket_lock; //Protects zmq_socket access
+	zcert_t * zmq_cert;
+	char* endpoint;
+	service_registration_pt svcFactoryReg;
+	array_list_pt pub_ep_list; //List<pubsub_endpoint>
+	hash_map_pt boundServices; //<bundle_pt,bound_service>
+	struct {
+		const char* type;
+		pubsub_serializer_service_t *svc;
+	} serializer;
+	celix_thread_mutex_t tp_lock;
+};
+
+typedef struct publish_bundle_bound_service {
+	topic_publication_pt parent;
+	pubsub_publisher_t service;
+	bundle_pt bundle;
+	char *topic;
+	hash_map_pt msgTypes;
+	unsigned short getCount;
+	celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure
+	bool mp_send_in_progress;
+	array_list_pt mp_parts;
+}* publish_bundle_bound_service_pt;
+
+/* Note: correct locking order is
+ * 1. tp_lock
+ * 2. mp_lock
+ * 3. socket_lock
+ *
+ * tp_lock and socket_lock are independent.
+ */
+
+typedef struct pubsub_msg{
+	pubsub_msg_header_pt header;
+	char* payload;
+	int payloadSize;
+}* pubsub_msg_pt;
+
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
+static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
+
+static void delay_first_send_for_late_joiners(void);
+
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+	celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	char* secure_topics = NULL;
+	bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
+
+	if (secure_topics){
+		array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
+
+		int i;
+		int secure_topics_size = arrayList_size(secure_topics_list);
+		for (i = 0; i < secure_topics_size; i++){
+			char* top = arrayList_get(secure_topics_list, i);
+			if (strcmp(pubEP->topic, top) == 0){
+				printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
+				pubEP->is_secure = true;
+			}
+			free(top);
+			top = NULL;
+		}
+
+		arrayList_destroy(secure_topics_list);
+	}
+
+	zcert_t* pub_cert = NULL;
+	if (pubEP->is_secure){
+		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+		if (keys_bundle_dir == NULL){
+			return CELIX_SERVICE_EXCEPTION;
+		}
+
+		const char* keys_file_path = NULL;
+		const char* keys_file_name = NULL;
+		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
+		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+
+		char cert_path[MAX_CERT_PATH_LENGTH];
+
+		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
+		snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
+		free(keys_bundle_dir);
+		printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
+
+		pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
+		if (pub_cert == NULL){
+			printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
+			printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
+			pubEP->is_secure = false;
+		}
+	}
+#endif
+
+	zsock_t* socket = zsock_new (ZMQ_PUB);
+	if(socket==NULL){
+		#ifdef BUILD_WITH_ZMQ_SECURITY
+			if (pubEP->is_secure){
+				zcert_destroy(&pub_cert);
+			}
+		#endif
+
+        perror("Error for zmq_socket");
+		return CELIX_SERVICE_EXCEPTION;
+	}
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	if (pubEP->is_secure){
+		zcert_apply (pub_cert, socket); // apply certificate to socket
+		zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
+	}
+#endif
+
+	int rv = -1, retry=0;
+	char* ep = malloc(EP_ADDRESS_LEN);
+    char bindAddress[EP_ADDRESS_LEN];
+
+	while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
+		/* Randomized part due to same bundle publishing on different topics */
+		unsigned int port = rand_range(basePort,maxPort);
+		memset(ep,0,EP_ADDRESS_LEN);
+        memset(bindAddress, 0, EP_ADDRESS_LEN);
+
+		snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
+        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind address than endpoint address
+		rv = zsock_bind (socket, "%s", bindAddress);
+        if (rv == -1) {
+            perror("Error for zmq_bind");
+        }
+		retry++;
+	}
+
+	if(rv == -1){
+		free(ep);
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+	/* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */
+
+	topic_publication_pt pub = calloc(1,sizeof(*pub));
+
+	arrayList_create(&(pub->pub_ep_list));
+	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
+	celixThreadMutex_create(&(pub->tp_lock),NULL);
+
+	pub->endpoint = ep;
+	pub->zmq_socket = socket;
+	pub->serializer.svc = best_serializer;
+	pub->serializer.type = serType;
+
+	celixThreadMutex_create(&(pub->socket_lock),NULL);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	if (pubEP->is_secure){
+		pub->zmq_cert = pub_cert;
+	}
+#endif
+
+	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+
+	*out = pub;
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+
+	free(pub->endpoint);
+	arrayList_destroy(pub->pub_ep_list);
+
+	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
+	while(hashMapIterator_hasNext(iter)){
+		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+		pubsub_destroyPublishBundleBoundService(bound);
+	}
+	hashMapIterator_destroy(iter);
+	hashMap_destroy(pub->boundServices,false,false);
+
+	pub->svcFactoryReg = NULL;
+	pub->serializer.svc = NULL;
+	pub->serializer.type = NULL;
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	zcert_destroy(&(pub->zmq_cert));
+#endif
+
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	celixThreadMutex_destroy(&(pub->tp_lock));
+
+	celixThreadMutex_lock(&(pub->socket_lock));
+	zsock_destroy(&(pub->zmq_socket));
+	celixThreadMutex_unlock(&(pub->socket_lock));
+
+	celixThreadMutex_destroy(&(pub->socket_lock));
+
+	free(pub);
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
+	celix_status_t status = CELIX_SUCCESS;
+
+	/* Let's register the new service */
+
+	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+
+	if(pubEP!=NULL){
+		service_factory_pt factory = calloc(1, sizeof(*factory));
+		factory->handle = pub;
+		factory->getService = pubsub_topicPublicationGetService;
+		factory->ungetService = pubsub_topicPublicationUngetService;
+
+		properties_pt props = properties_create();
+		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
+
+		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
+
+		if(status != CELIX_SUCCESS){
+			properties_destroy(props);
+			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %s).\n",
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+				   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
+		}
+		else{
+			*svcFactory = factory;
+		}
+	}
+	else{
+		printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
+		status = CELIX_SERVICE_EXCEPTION;
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
+	return serviceRegistration_unregister(pub->svcFactoryReg);
+}
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
+    pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
+	arrayList_add(pub->pub_ep_list,ep);
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+	for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
+	        pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
+	        if(pubsubEndpoint_equals(ep, e)) {
+	            arrayList_removeElement(pub->pub_ep_list,ep);
+	            break;
+	        }
+	}
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return CELIX_SUCCESS;
+}
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
+	array_list_pt list = NULL;
+	celixThreadMutex_lock(&(pub->tp_lock));
+	list = arrayList_clone(pub->pub_ep_list);
+	celixThreadMutex_unlock(&(pub->tp_lock));
+	return list;
+}
+
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
+	celix_status_t  status = CELIX_SUCCESS;
+
+	topic_publication_pt publish = (topic_publication_pt)handle;
+
+	celixThreadMutex_lock(&(publish->tp_lock));
+
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	if(bound==NULL){
+		bound = pubsub_createPublishBundleBoundService(publish,bundle);
+		if(bound!=NULL){
+			hashMap_put(publish->boundServices,bundle,bound);
+		}
+	}
+	else{
+		bound->getCount++;
+	}
+
+	*service = &bound->service;
+
+	celixThreadMutex_unlock(&(publish->tp_lock));
+
+	return status;
+}
+
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
+
+	topic_publication_pt publish = (topic_publication_pt)handle;
+
+	celixThreadMutex_lock(&(publish->tp_lock));
+
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	if(bound!=NULL){
+
+		bound->getCount--;
+		if(bound->getCount==0){
+			pubsub_destroyPublishBundleBoundService(bound);
+			hashMap_remove(publish->boundServices,bundle);
+		}
+
+	}
+	else{
+		long bundleId = -1;
+		bundle_getBundleId(bundle,&bundleId);
+		printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
+	}
+
+	/* service should be never used for unget, so let's set the pointer to NULL */
+	*service = NULL;
+
+	celixThreadMutex_unlock(&(publish->tp_lock));
+
+	return CELIX_SUCCESS;
+}
+
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
+
+	bool ret = true;
+
+	zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
+	if (headerMsg == NULL) ret=false;
+	zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
+	if (payloadMsg == NULL) ret=false;
+
+	delay_first_send_for_late_joiners();
+
+	if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
+
+	if(!last){
+		if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
+	}
+	else{
+		if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
+	}
+
+	if (!ret){
+		zframe_destroy(&headerMsg);
+		zframe_destroy(&payloadMsg);
+	}
+
+	free(msg->header);
+	free(msg->payload);
+	free(msg);
+
+	return ret;
+
+}
+
+static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
+
+	bool ret = true;
+
+	unsigned int i = 0;
+	unsigned int mp_num = arrayList_size(mp_msg_parts);
+	for(;i<mp_num;i++){
+		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+	}
+	arrayList_clear(mp_msg_parts);
+
+	return ret;
+
+}
+
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
+
+	return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
+
+}
+
+static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){
+
+	int status = 0;
+
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+
+	celixThreadMutex_lock(&(bound->parent->tp_lock));
+	celixThreadMutex_lock(&(bound->mp_lock));
+	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
+		printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
+		celixThreadMutex_unlock(&(bound->mp_lock));
+		celixThreadMutex_unlock(&(bound->parent->tp_lock));
+		return -3;
+	}
+
+	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
+
+	if (msgSer!= NULL) {
+		int major=0, minor=0;
+
+		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
+		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
+		msg_hdr->type = msgTypeId;
+
+		if (msgSer->msgVersion != NULL){
+			version_getMajor(msgSer->msgVersion, &major);
+			version_getMinor(msgSer->msgVersion, &minor);
+			msg_hdr->major = major;
+			msg_hdr->minor = minor;
+		}
+
+		void *serializedOutput = NULL;
+		size_t serializedOutputLen = 0;
+		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
+
+		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+		msg->header = msg_hdr;
+		msg->payload = (char*)serializedOutput;
+		msg->payloadSize = serializedOutputLen;
+		bool snd = true;
+
+		switch(flags){
+		case PUBSUB_PUBLISHER_FIRST_MSG:
+			bound->mp_send_in_progress = true;
+			arrayList_add(bound->mp_parts,msg);
+			break;
+		case PUBSUB_PUBLISHER_PART_MSG:
+			if(!bound->mp_send_in_progress){
+				printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
+				status = -4;
+			}
+			else{
+				arrayList_add(bound->mp_parts,msg);
+			}
+			break;
+		case PUBSUB_PUBLISHER_LAST_MSG:
+			if(!bound->mp_send_in_progress){
+				printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
+				status = -4;
+			}
+			else{
+				arrayList_add(bound->mp_parts,msg);
+				snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
+				bound->mp_send_in_progress = false;
+			}
+			break;
+		case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
+			snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
+			break;
+		default:
+			printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
+			status = -4;
+			break;
+		}
+
+		if(status==-4){
+			free(msg);
+		}
+
+		if(!snd){
+			printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
+		}
+
+	} else {
+        printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId);
+		status=-1;
+	}
+
+	celixThreadMutex_unlock(&(bound->mp_lock));
+	celixThreadMutex_unlock(&(bound->parent->tp_lock));
+
+	return status;
+
+}
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
+	*msgTypeId = utils_stringHash(msgType);
+	return 0;
+}
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+
+	double scaled = (double)(((double)random())/((double)RAND_MAX));
+	return (max-min+1)*scaled + min;
+
+}
+
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+
+	//PRECOND lock on tp->lock
+
+	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+
+	if (bound != NULL) {
+
+		bound->parent = tp;
+		bound->bundle = bundle;
+		bound->getCount = 1;
+		bound->mp_send_in_progress = false;
+		celixThreadMutex_create(&bound->mp_lock,NULL);
+
+		if(tp->serializer.svc != NULL){
+			tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
+		}
+
+		arrayList_create(&bound->mp_parts);
+
+		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
+		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+		bound->service.handle = bound;
+		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+		bound->service.send = pubsub_topicPublicationSend;
+		bound->service.sendMultipart = pubsub_topicPublicationSendMultipart;
+
+	}
+
+	return bound;
+}
+
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+
+	//PRECOND lock on tp->lock
+
+	celixThreadMutex_lock(&boundSvc->mp_lock);
+
+
+	if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
+		boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
+	}
+
+	if(boundSvc->mp_parts!=NULL){
+		arrayList_destroy(boundSvc->mp_parts);
+	}
+
+	if(boundSvc->topic!=NULL){
+		free(boundSvc->topic);
+	}
+
+	celixThreadMutex_unlock(&boundSvc->mp_lock);
+	celixThreadMutex_destroy(&boundSvc->mp_lock);
+
+	free(boundSvc);
+
+}
+
+static void delay_first_send_for_late_joiners(){
+
+	static bool firstSend = true;
+
+	if(firstSend){
+		printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
+		sleep(FIRST_SEND_DELAY);
+		firstSend = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
new file mode 100644
index 0000000..20e4a8e
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_publication.h
@@ -0,0 +1,49 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "pubsub/publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+typedef struct topic_publication *topic_publication_pt;
+
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
new file mode 100644
index 0000000..46a1688
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -0,0 +1,761 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.c
+ *
+ *  \date       Oct 2, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "topic_subscription.h"
+#include <czmq.h>
+/* The following undefs prevent the collision between:
+ * - sys/syslog.h (which is included within czmq)
+ * - celix/dfi/dfi_log_util.h
+ */
+#undef LOG_DEBUG
+#undef LOG_WARNING
+#undef LOG_INFO
+#undef LOG_WARNING
+
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+
+#include "utils.h"
+#include "celix_errno.h"
+#include "constants.h"
+#include "version.h"
+
+#include "pubsub/subscriber.h"
+#include "pubsub/publisher.h"
+#include "pubsub_utils.h"
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+#include "zmq_crypto.h"
+
+#define MAX_CERT_PATH_LENGTH 512
+#endif
+
+#define POLL_TIMEOUT  	250
+#define ZMQ_POLL_TIMEOUT_MS_ENV 	"ZMQ_POLL_TIMEOUT_MS"
+
+#define PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC "PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC"
+#define PSA_ZMQ_RECV_DEFAULT_TIMEOUT_STR "1000"
+#define PSA_ZMQ_RECV_DEFAULT_TIMEOUT 1000
+
+
+struct topic_subscription{
+
+	zsock_t* zmq_socket;
+	zcert_t * zmq_cert;
+	zcert_t * zmq_pub_cert;
+	pthread_mutex_t socket_lock;
+	service_tracker_pt tracker;
+	array_list_pt sub_ep_list;
+	celix_thread_t recv_thread;
+	bool running;
+	celix_thread_mutex_t ts_lock;
+	bundle_context_pt context;
+
+	struct {
+		const char* type;
+		pubsub_serializer_service_t *svc;
+	} serializer;
+
+	hash_map_pt servicesMap; // key = service, value = msg types map
+
+	celix_thread_mutex_t pendingConnections_lock;
+	array_list_pt pendingConnections;
+
+	array_list_pt pendingDisconnections;
+	celix_thread_mutex_t pendingDisconnections_lock;
+
+	unsigned int nrSubscribers;
+	int zmqReceiveTimeout;
+};
+
+typedef struct complete_zmq_msg{
+	zframe_t* header;
+	zframe_t* payload;
+}* complete_zmq_msg_pt;
+
+typedef struct mp_handle{
+	hash_map_pt svc_msg_db;
+	hash_map_pt rcv_msg_map;
+}* mp_handle_pt;
+
+typedef struct msg_map_entry{
+	bool retain;
+	void* msgInst;
+}* msg_map_entry_pt;
+
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service);
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service);
+static void* zmq_recv_thread_func(void* arg);
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static void sigusr1_sighandler(int signo);
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
+static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
+static void destroy_mp_handle(mp_handle_pt mp_handle);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
+static unsigned int get_zmq_receive_timeout(bundle_context_pt context);
+
+
+static unsigned int get_zmq_receive_timeout(bundle_context_pt context) {
+	unsigned int timeout;
+	const char* timeout_str = NULL;
+	bundleContext_getPropertyWithDefault(context,
+										 PSA_ZMQ_RECEIVE_TIMEOUT_MICROSEC,
+										 PSA_ZMQ_RECV_DEFAULT_TIMEOUT_STR,
+										 &timeout_str);
+	timeout = strtoul(timeout_str, NULL, 10);
+	if (timeout == 0) {
+		// on errror strtol returns 0
+		timeout = PSA_ZMQ_RECV_DEFAULT_TIMEOUT;
+	}
+
+	return timeout;
+}
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, const char *serType, topic_subscription_pt* out){
+	celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+	if (keys_bundle_dir == NULL){
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+	const char* keys_file_path = NULL;
+	const char* keys_file_name = NULL;
+	bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
+	bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+
+	char sub_cert_path[MAX_CERT_PATH_LENGTH];
+	char pub_cert_path[MAX_CERT_PATH_LENGTH];
+
+	//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
+	snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
+	snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
+	free(keys_bundle_dir);
+
+	printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", sub_cert_path);
+	printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path);
+
+	zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path);
+	if (sub_cert == NULL){
+		printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path);
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+	zcert_t* pub_cert = zcert_load(pub_cert_path);
+	if (pub_cert == NULL){
+		zcert_destroy(&sub_cert);
+		printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path);
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+	const char* pub_key = zcert_public_txt(pub_cert);
+#endif
+
+	zsock_t* zmq_s = zsock_new (ZMQ_SUB);
+	if(zmq_s==NULL){
+#ifdef BUILD_WITH_ZMQ_SECURITY
+		zcert_destroy(&sub_cert);
+		zcert_destroy(&pub_cert);
+#endif
+
+		return CELIX_SERVICE_EXCEPTION;
+	}
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	zcert_apply (sub_cert, zmq_s);
+	zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
+#endif
+
+	if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){
+		zsock_set_subscribe (zmq_s, "");
+	}
+	else{
+		zsock_set_subscribe (zmq_s, topic);
+	}
+
+	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
+	ts->context = bundle_context;
+	ts->zmq_socket = zmq_s;
+	ts->running = false;
+	ts->nrSubscribers = 0;
+	ts->serializer.svc = best_serializer;
+	ts->zmqReceiveTimeout = get_zmq_receive_timeout(bundle_context);
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	ts->zmq_cert = sub_cert;
+	ts->zmq_pub_cert = pub_cert;
+#endif
+
+	celixThreadMutex_create(&ts->socket_lock, NULL);
+	celixThreadMutex_create(&ts->ts_lock,NULL);
+	arrayList_create(&ts->sub_ep_list);
+	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+
+	arrayList_create(&ts->pendingConnections);
+	arrayList_create(&ts->pendingDisconnections);
+	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+	celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+
+	char filter[128];
+	memset(filter,0,128);
+	if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
+		// default scope, means that subscriber has not defined a scope property
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+	} else {
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic,
+				PUBSUB_SUBSCRIBER_SCOPE,scope);
+	}
+	service_tracker_customizer_pt customizer = NULL;
+	status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
+	status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker);
+
+	struct sigaction actions;
+	memset(&actions, 0, sizeof(actions));
+	sigemptyset(&actions.sa_mask);
+	actions.sa_flags = 0;
+	actions.sa_handler = sigusr1_sighandler;
+
+	sigaction(SIGUSR1,&actions,NULL);
+
+	*out=ts;
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+
+	ts->running = false;
+	serviceTracker_destroy(ts->tracker);
+	arrayList_clear(ts->sub_ep_list);
+	arrayList_destroy(ts->sub_ep_list);
+	/* TODO: Destroy all the serializer maps? */
+	hashMap_destroy(ts->servicesMap,false,false);
+
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_destroy(ts->pendingConnections);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_destroy(ts->pendingDisconnections);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
+
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	celixThreadMutex_lock(&ts->socket_lock);
+	zsock_destroy(&(ts->zmq_socket));
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	zcert_destroy(&(ts->zmq_cert));
+	zcert_destroy(&(ts->zmq_pub_cert));
+#endif
+	celixThreadMutex_unlock(&ts->socket_lock);
+	celixThreadMutex_destroy(&ts->socket_lock);
+
+	celixThreadMutex_destroy(&ts->ts_lock);
+
+	free(ts);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
+	celix_status_t status = CELIX_SUCCESS;
+
+	status = serviceTracker_open(ts->tracker);
+
+	ts->running = true;
+
+	if(status==CELIX_SUCCESS){
+		status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
+	celix_status_t status = CELIX_SUCCESS;
+
+	ts->running = false;
+
+	pthread_kill(ts->recv_thread.thread,SIGUSR1);
+
+	celixThread_join(ts->recv_thread,NULL);
+
+	status = serviceTracker_close(ts->tracker);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){
+	celix_status_t status = CELIX_SUCCESS;
+	celixThreadMutex_lock(&ts->socket_lock);
+	if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){
+		status = CELIX_SERVICE_EXCEPTION;
+	}
+	celixThreadMutex_unlock(&ts->socket_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_add(ts->pendingConnections, url);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_add(ts->pendingDisconnections, url);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->socket_lock);
+	if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){
+		status = CELIX_SERVICE_EXCEPTION;
+	}
+	celixThreadMutex_unlock(&ts->socket_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	arrayList_add(ts->sub_ep_list,subEP);
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+
+}
+
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	ts->nrSubscribers++;
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	arrayList_removeElement(ts->sub_ep_list,subEP);
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	ts->nrSubscribers--;
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
+	return ts->nrSubscribers;
+}
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){
+	return sub->sub_ep_list;
+}
+
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+	celix_status_t status = CELIX_SUCCESS;
+	topic_subscription_pt ts = handle;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	if (!hashMap_containsKey(ts->servicesMap, service)) {
+		bundle_pt bundle = NULL;
+		hash_map_pt msgTypes = NULL;
+
+		serviceReference_getBundle(reference, &bundle);
+
+		if(ts->serializer.svc != NULL && bundle!=NULL){
+			ts->serializer.svc->createSerializerMap(ts->serializer.svc->handle,bundle,&msgTypes);
+			if(msgTypes != NULL){
+				hashMap_put(ts->servicesMap, service, msgTypes);
+				printf("PSA_ZMQ_TS: New subscriber registered.\n");
+			}
+		}
+		else{
+			printf("PSA_ZMQ_TS: Cannot register new subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+	celix_status_t status = CELIX_SUCCESS;
+	topic_subscription_pt ts = handle;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+	if (hashMap_containsKey(ts->servicesMap, service)) {
+		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+		if(msgTypes!=NULL && ts->serializer.svc!=NULL){
+			ts->serializer.svc->destroySerializerMap(ts->serializer.svc->handle,msgTypes);
+			printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+		}
+		else{
+			printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+
+static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
+
+	pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
+
+	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+	while (hashMapIterator_hasNext(iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
+		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+
+		pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type);
+		if (msgSer == NULL) {
+			printf("PSA_ZMQ_TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
+		}
+		else{
+			void *msgInst = NULL;
+			bool validVersion = checkVersion(msgSer->msgVersion,first_msg_hdr);
+
+			if(validVersion){
+
+				celix_status_t status = msgSer->deserialize(msgSer, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst);
+
+				if (status == CELIX_SUCCESS) {
+					bool release = true;
+					mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
+					pubsub_multipart_callbacks_t mp_callbacks;
+					mp_callbacks.handle = mp_handle;
+					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
+					mp_callbacks.getMultipart = pubsub_getMultipart;
+					subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
+
+					if(release){
+						msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst);
+					}
+					if(mp_handle!=NULL){
+						destroy_mp_handle(mp_handle);
+					}
+				}
+				else{
+					printf("PSA_ZMQ_TS: Cannot deserialize msgType %s.\n",msgSer->msgName);
+				}
+
+			}
+			else{
+				int major=0,minor=0;
+				version_getMajor(msgSer->msgVersion,&major);
+				version_getMinor(msgSer->msgVersion,&minor);
+				printf("PSA_ZMQ_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+						msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
+			}
+
+		}
+	}
+	hashMapIterator_destroy(iter);
+
+	int i = 0;
+	for(;i<arrayList_size(msg_list);i++){
+		complete_zmq_msg_pt c_msg = arrayList_get(msg_list,i);
+		zframe_destroy(&(c_msg->header));
+		zframe_destroy(&(c_msg->payload));
+		free(c_msg);
+	}
+
+	arrayList_destroy(msg_list);
+
+}
+
+static void* zmq_recv_thread_func(void * arg) {
+	topic_subscription_pt sub = (topic_subscription_pt) arg;
+
+	while (sub->running) {
+
+		celixThreadMutex_lock(&sub->socket_lock);
+
+		zframe_t* headerMsg = zframe_recv_nowait(sub->zmq_socket);
+		if (headerMsg == NULL) {
+			if(errno == EAGAIN) {
+				usleep(sub->zmqReceiveTimeout);
+			} else if (errno == EINTR) {
+				//It means we got a signal and we have to exit...
+				printf("PSA_ZMQ_TS: header_recv thread for topic got a signal and will exit.\n");
+			} else {
+				perror("PSA_ZMQ_TS: header_recv thread");
+			}
+		}
+		else {
+
+			pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg);
+
+			if (zframe_more(headerMsg)) {
+
+				zframe_t* payloadMsg = zframe_recv(sub->zmq_socket);
+				if (payloadMsg == NULL) {
+					if (errno == EINTR) {
+						//It means we got a signal and we have to exit...
+						printf("PSA_ZMQ_TS: payload_recv thread for topic got a signal and will exit.\n");
+					} else {
+						perror("PSA_ZMQ_TS: payload_recv");
+					}
+					zframe_destroy(&headerMsg);
+				} else {
+
+					//Let's fetch all the messages from the socket
+					array_list_pt msg_list = NULL;
+					arrayList_create(&msg_list);
+					complete_zmq_msg_pt firstMsg = calloc(1, sizeof(struct complete_zmq_msg));
+					firstMsg->header = headerMsg;
+					firstMsg->payload = payloadMsg;
+					arrayList_add(msg_list, firstMsg);
+
+					bool more = zframe_more(payloadMsg);
+					while (more) {
+
+						zframe_t* h_msg = zframe_recv(sub->zmq_socket);
+						if (h_msg == NULL) {
+							if (errno == EINTR) {
+								//It means we got a signal and we have to exit...
+								printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n");
+							} else {
+								perror("PSA_ZMQ_TS: h_recv");
+							}
+							break;
+						}
+
+						zframe_t* p_msg = zframe_recv(sub->zmq_socket);
+						if (p_msg == NULL) {
+							if (errno == EINTR) {
+								//It means we got a signal and we have to exit...
+								printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n");
+							} else {
+								perror("PSA_ZMQ_TS: p_recv");
+							}
+							zframe_destroy(&h_msg);
+							break;
+						}
+
+						complete_zmq_msg_pt c_msg = calloc(1, sizeof(struct complete_zmq_msg));
+						c_msg->header = h_msg;
+						c_msg->payload = p_msg;
+						arrayList_add(msg_list, c_msg);
+
+						if (!zframe_more(p_msg)) {
+							more = false;
+						}
+					}
+
+					celixThreadMutex_lock(&sub->ts_lock);
+					process_msg(sub, msg_list);
+					celixThreadMutex_unlock(&sub->ts_lock);
+
+				}
+
+			} //zframe_more(headerMsg)
+			else {
+				free(headerMsg);
+				printf("PSA_ZMQ_TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic);
+			}
+
+		} // headerMsg != NULL
+		celixThreadMutex_unlock(&sub->socket_lock);
+		connectPendingPublishers(sub);
+		disconnectPendingPublishers(sub);
+	} // while
+
+	return NULL;
+}
+
+static void connectPendingPublishers(topic_subscription_pt sub) {
+	celixThreadMutex_lock(&sub->pendingConnections_lock);
+	while(!arrayList_isEmpty(sub->pendingConnections)) {
+		char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+		pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+		free(pubEP);
+	}
+	celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
+
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+	celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+	while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+		char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+		pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+		free(pubEP);
+	}
+	celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+	printf("PSA_ZMQ_TS: Topic subscription being shut down...\n");
+	return;
+}
+
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+	bool check=false;
+	int major=0,minor=0;
+
+	if(msgVersion!=NULL){
+		version_getMajor(msgVersion,&major);
+		version_getMinor(msgVersion,&minor);
+		if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+			check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+		}
+	}
+
+	return check;
+}
+
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
+	*msgTypeId = utils_stringHash(msgType);
+	return 0;
+}
+
+static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part){
+
+	if(handle==NULL){
+		*part = NULL;
+		return -1;
+	}
+
+	mp_handle_pt mp_handle = (mp_handle_pt)handle;
+	msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map, (void*)(uintptr_t) msgTypeId);
+	if(entry!=NULL){
+		entry->retain = retain;
+		*part = entry->msgInst;
+	}
+	else{
+		printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId);
+		*part=NULL;
+		return -2;
+	}
+
+	return 0;
+
+}
+
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
+
+	if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
+		return NULL;
+	}
+
+	mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
+	mp_handle->svc_msg_db = svc_msg_db;
+	mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
+
+	int i=1; //We skip the first message, it will be handle differently
+	for(;i<arrayList_size(rcv_msg_list);i++){
+		complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
+		pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
+
+		pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, (void*)(uintptr_t)(header->type));
+
+		if (msgSer!= NULL) {
+			void *msgInst = NULL;
+
+			bool validVersion = checkVersion(msgSer->msgVersion,header);
+
+			if(validVersion){
+				celix_status_t status = msgSer->deserialize(msgSer, (const void*)zframe_data(c_msg->payload), 0, &msgInst);
+
+				if(status == CELIX_SUCCESS){
+					msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
+					entry->msgInst = msgInst;
+					hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)header->type,entry);
+				}
+			}
+		}
+	}
+
+	return mp_handle;
+
+}
+
+static void destroy_mp_handle(mp_handle_pt mp_handle){
+
+	hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map);
+	while(hashMapIterator_hasNext(iter)){
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry);
+		msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
+		pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId);
+
+		if(msgSer!=NULL){
+			if(!msgEntry->retain){
+				msgSer->freeMsg(msgSer->handle,msgEntry->msgInst);
+			}
+		}
+		else{
+			printf("PSA_ZMQ_TS: ERROR: Cannot find messageSerializer for msg %u, so cannot destroy it!\n",msgId);
+		}
+
+		free(msgEntry);
+	}
+	hashMapIterator_destroy(iter);
+
+	hashMap_destroy(mp_handle->rcv_msg_map,false,false);
+	free(mp_handle);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
new file mode 100644
index 0000000..6dca4e5
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/topic_subscription.h
@@ -0,0 +1,60 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.h
+ *
+ *  \date       Sep 22, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_SUBSCRIPTION_H_
+#define TOPIC_SUBSCRIPTION_H_
+
+#include "celix_threads.h"
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_tracker.h"
+
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+#include "pubsub_serializer.h"
+
+typedef struct topic_subscription* topic_subscription_pt;
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, const char* serType, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
+
+celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
+celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL);
+celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub);
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
+
+#endif /*TOPIC_SUBSCRIPTION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
new file mode 100644
index 0000000..fe444bd
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
@@ -0,0 +1,281 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.c
+ *
+ *  \date       Dec 2, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "zmq_crypto.h"
+
+#include <zmq.h>
+#include <openssl/conf.h>
+#include <openssl/evp.h>
+#include <openssl/err.h>
+
+#include <string.h>
+
+#define MAX_FILE_PATH_LENGTH 512
+#define ZMQ_KEY_LENGTH 40
+#define AES_KEY_LENGTH 32
+#define AES_IV_LENGTH 16
+
+#define KEY_TO_GET "aes_key"
+#define IV_TO_GET "aes_iv"
+
+static char* read_file_content(char* filePath, char* fileName);
+static void parse_key_lines(char *keysBuffer, char **key, char **iv);
+static void parse_key_line(char *line, char **key, char **iv);
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey);
+
+/**
+ * Return a valid zcert_t from an encoded file
+ * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
+ */
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path)
+{
+
+	if (keysFilePath == NULL){
+		keysFilePath = DEFAULT_KEYS_FILE_PATH;
+	}
+
+	if (keysFileName == NULL){
+		keysFileName = DEFAULT_KEYS_FILE_NAME;
+	}
+
+	char* keys_data = read_file_content(keysFilePath, keysFileName);
+	if (keys_data == NULL){
+		return NULL;
+	}
+
+	char *key = NULL;
+	char *iv = NULL;
+	parse_key_lines(keys_data, &key, &iv);
+	free(keys_data);
+
+	if (key == NULL || iv == NULL){
+		free(key);
+		free(iv);
+
+		printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
+		return NULL;
+	}
+
+	//At this point, we know an aes key and iv are stored and loaded
+
+	// generate sha256 hashes
+	unsigned char key_digest[EVP_MAX_MD_SIZE];
+	unsigned char iv_digest[EVP_MAX_MD_SIZE];
+	generate_sha256_hash((char*) key, key_digest);
+	generate_sha256_hash((char*) iv, iv_digest);
+
+	zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
+	if (encoded_secret == NULL){
+		free(key);
+		free(iv);
+
+		return NULL;
+	}
+
+	int encoded_secret_size = (int) zchunk_size (encoded_secret);
+	char* encoded_secret_data = zchunk_strdup(encoded_secret);
+	zchunk_destroy (&encoded_secret);
+
+	// Decryption of data
+	int decryptedtext_len;
+	unsigned char decryptedtext[encoded_secret_size];
+	decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext);
+	decryptedtext[decryptedtext_len] = '\0';
+
+	EVP_cleanup();
+
+	free(encoded_secret_data);
+	free(key);
+	free(iv);
+
+	// The public and private keys are retrieved
+	char* public_text = NULL;
+	char* secret_text = NULL;
+
+	extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text);
+
+	byte public_key [32] = { 0 };
+	byte secret_key [32] = { 0 };
+
+	zmq_z85_decode (public_key, public_text);
+	zmq_z85_decode (secret_key, secret_text);
+
+	zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
+
+	free(public_text);
+	free(secret_text);
+
+	return cert_loaded;
+}
+
+int generate_sha256_hash(char* text, unsigned char* digest)
+{
+	unsigned int digest_len;
+
+	EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
+	EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+	EVP_DigestUpdate(mdctx, text, strlen(text));
+	EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+	EVP_MD_CTX_free(mdctx);
+
+	return digest_len;
+}
+
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext)
+{
+	int len;
+	int plaintext_len;
+
+	EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+
+	EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+	EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+	plaintext_len = len;
+	EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+	plaintext_len += len;
+
+	EVP_CIPHER_CTX_free(ctx);
+
+	return plaintext_len;
+}
+
+/**
+ * Caller is responsible for freeing the returned value
+ */
+static char* read_file_content(char* filePath, char* fileName){
+
+	char fileNameWithPath[MAX_FILE_PATH_LENGTH];
+	snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName);
+	int rc = 0;
+
+	if (!zsys_file_exists(fileNameWithPath)){
+		printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath);
+		return NULL;
+	}
+
+	zfile_t* keys_file = zfile_new (filePath, fileName);
+	rc = zfile_input (keys_file);
+	if (rc != 0){
+		zfile_destroy(&keys_file);
+		printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath);
+		return NULL;
+	}
+
+	ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
+	zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+	if (keys_chunk == NULL){
+		zfile_close(keys_file);
+		zfile_destroy(&keys_file);
+		printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
+		return NULL;
+	}
+
+	char* keys_data = zchunk_strdup(keys_chunk);
+	zchunk_destroy(&keys_chunk);
+	zfile_close(keys_file);
+	zfile_destroy (&keys_file);
+
+	return keys_data;
+}
+
+static void parse_key_lines(char *keysBuffer, char **key, char **iv){
+	char *line = NULL, *saveLinePointer = NULL;
+
+	bool firstTime = true;
+	do {
+		if (firstTime){
+			line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+			firstTime = false;
+		}else {
+			line = strtok_r(NULL, "\n", &saveLinePointer);
+		}
+
+		if (line == NULL){
+			break;
+		}
+
+		parse_key_line(line, key, iv);
+
+	} while((*key == NULL || *iv == NULL) && line != NULL);
+
+}
+
+static void parse_key_line(char *line, char **key, char **iv){
+	char *detectedKey = NULL, *detectedValue= NULL;
+
+	char* sep_at = strchr(line, ':');
+	if (sep_at == NULL){
+		return;
+	}
+
+	*sep_at = '\0'; // overwrite first separator, creating two strings.
+	detectedKey = line;
+	detectedValue = sep_at + 1;
+
+	if (detectedKey == NULL || detectedValue == NULL){
+		return;
+	}
+	if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
+		return;
+	}
+
+	if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
+		*key = strndup(detectedValue, AES_KEY_LENGTH);
+	} else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
+		*iv = strndup(detectedValue, AES_IV_LENGTH);
+	}
+}
+
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) {
+	// Load decrypted text buffer
+	zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
+	if (secret_decrypted == NULL){
+		printf("CRYPTO: Failed to create zchunk\n");
+		return;
+	}
+
+	zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
+	zchunk_destroy (&secret_decrypted);
+	if (secret_config == NULL){
+		printf("CRYPTO: Failed to create zconfig\n");
+		return;
+	}
+
+	// Extract public and secret key from text buffer
+	char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL);
+	char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL);
+
+	if (public_text == NULL || secret_text == NULL){
+		zconfig_destroy(&secret_config);
+		printf("CRYPTO: Loading public / secret key from text-buffer failed!\n");
+		return;
+	}
+
+	*publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
+	*secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
+
+	zconfig_destroy(&secret_config);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
new file mode 100644
index 0000000..f1a990f
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
@@ -0,0 +1,41 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.h
+ *
+ *  \date       Dec 2, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef ZMQ_CRYPTO_H_
+#define ZMQ_CRYPTO_H_
+
+#include <czmq.h>
+
+#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
+#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
+#define DEFAULT_KEYS_FILE_PATH "/etc/"
+#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
+
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path);
+int generate_sha256_hash(char* text, unsigned char* digest);
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext);
+
+#endif

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_api/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/CMakeLists.txt b/bundles/pubsub/pubsub_api/CMakeLists.txt
new file mode 100644
index 0000000..6ed6956
--- /dev/null
+++ b/bundles/pubsub/pubsub_api/CMakeLists.txt
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#api target
+add_library(pubsub_api INTERFACE)
+
+target_include_directories(pubsub_api INTERFACE
+    $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
+    $<INSTALL_INTERFACE:include/celix/pubsub>
+)
+
+#install api
+install(TARGETS pubsub_api EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub)
+install(DIRECTORY include/ DESTINATION include/celix/pubsub COMPONENT pubsub)
+
+#Setup target aliases to match external usage
+add_library(Celix::pubsub_api ALIAS pubsub_api)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
new file mode 100644
index 0000000..9f7f3b6
--- /dev/null
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -0,0 +1,87 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * publisher.h
+ *
+ *  \date       Jan 7, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef __PUBSUB_PUBLISHER_H_
+#define __PUBSUB_PUBLISHER_H_
+
+#include <stdlib.h>
+
+#define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION	    "2.0.0"
+ 
+//properties
+#define PUBSUB_PUBLISHER_TOPIC                  "topic"
+#define PUBSUB_PUBLISHER_SCOPE                  "scope"
+#define PUBSUB_PUBLISHER_CONFIG                 "pubsub.config"
+ 
+#define PUBSUB_PUBLISHER_SCOPE_DEFAULT			"default"
+//flags
+#define PUBSUB_PUBLISHER_FIRST_MSG  01
+#define PUBSUB_PUBLISHER_PART_MSG   02
+#define PUBSUB_PUBLISHER_LAST_MSG   04
+
+struct pubsub_release_callback_struct {
+    void *handle;
+    void (*release)(char *buf, void *handle);
+};
+typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
+typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;
+ 
+ 
+struct pubsub_publisher {
+    void *handle;
+ 
+    /**
+     * Every msg is identifiable by msg type string. Because masg type string are performance wise not preferable (string compares),
+     * a "local" (int / platform dependent) unique id will be generated runtime
+     * with use of a distributed key/value store or communication between  participation parties.
+     * this is called the local message type id. This local message type id can be requested with the localMsgIdForMsgType method.
+     * When return is successful the msgTypeId is always greater than 0. (Note this can be used to specify/detect uninitialized msg type ids in the consumer code).
+     *
+     * Returns 0 on success.
+     */
+    int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgTypeId);
+  
+    /**
+     * send is a async function, but the msg can be safely deleted after send returns.
+     * Returns 0 on success.
+     */
+    int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
+ 
+  
+    /**
+     * sendMultipart is a async function, but the msg can be safely deleted after send returns.
+     * The first (primary) message of a multipart message must have the flag PUBLISHER_PRIMARY_MSG
+     * The last message of a multipart message must have the flag PUBLISHER_LAST_MSG
+     * Returns 0 on success.
+     */
+    int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void *msg, int flags);
+ 
+};
+typedef struct pubsub_publisher pubsub_publisher_t;
+typedef struct pubsub_publisher* pubsub_publisher_pt;
+
+#endif // __PUBSUB_PUBLISHER_H_

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
new file mode 100644
index 0000000..ca6d4d1
--- /dev/null
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -0,0 +1,74 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * subscriber.h
+ *
+ *  \date       Jan 7, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef __PUBSUB_SUBSCRIBER_H_
+#define __PUBSUB_SUBSCRIBER_H_
+
+#include <stdbool.h>
+
+#define PUBSUB_SUBSCRIBER_SERVICE_NAME          "pubsub.subscriber"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "2.0.0"
+ 
+//properties
+#define PUBSUB_SUBSCRIBER_TOPIC                "topic"
+#define PUBSUB_SUBSCRIBER_SCOPE                "scope"
+#define PUBSUB_SUBSCRIBER_CONFIG               "pubsub.config"
+
+#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"
+ 
+struct pubsub_multipart_callbacks_struct {
+    void *handle;
+    int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgId);
+    int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, void **part);
+};
+typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
+typedef struct pubsub_multipart_callbacks_struct* pubsub_multipart_callbacks_pt;
+ 
+struct pubsub_subscriber_struct {
+    void *handle;
+     
+    /**
+     * When a new message for a topic is available the receive will be called.
+     * 
+     * msgType contains fully qualified name of the type and msgTypeId is a local id which presents the type for performance reasons.
+     * Release can be used to instruct the pubsubadmin to release (free) the message when receive function returns. Set it to false to take
+     * over ownership of the msg (e.g. take the responsibility to free it).
+     *
+     * The callbacks argument is only valid inside the receive function, use the getMultipart callback, with retain=true, to keep multipart messages in memory.
+     * results of the localMsgTypeIdForMsgType callback are valid during the complete lifecycle of the component, not just a single receive call.
+     *
+     * Return 0 implies a successful handling. If return is not 0, the msg will always be released by the pubsubadmin.
+     *
+     * this method can be  NULL.
+     */
+    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+
+};
+typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
+typedef struct pubsub_subscriber_struct* pubsub_subscriber_pt;
+
+
+#endif //  __PUBSUB_SUBSCRIBER_H_

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
new file mode 100644
index 0000000..9d5f074
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(CURL REQUIRED)
+find_package(Jansson REQUIRED)
+
+add_celix_bundle(celix_pubsub_discovery_etcd
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
+    VERSION "1.0.0"
+    SOURCES
+        src/psd_activator.c
+        src/pubsub_discovery_impl.c
+        src/etcd_common.c
+        src/etcd_watcher.c
+        src/etcd_writer.c
+)
+target_include_directories(celix_pubsub_discovery_etcd PRIVATE src)
+target_include_directories(celix_pubsub_discovery_etcd SYSTEM PRIVATE 
+	${CURL_INCLUDE_DIR}
+	${JANSSON_INCLUDE_DIR}
+)
+target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+
+install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
+
+add_library(Celix::pubsub_discovery_etcd ALIAS celix_pubsub_discovery_etcd)

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_common.c b/bundles/pubsub/pubsub_discovery/src/etcd_common.c
new file mode 100644
index 0000000..c757801
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/etcd_common.c
@@ -0,0 +1,82 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include <curl/curl.h>
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+#define MAX_ROOTNODE_LENGTH		128
+#define MAX_LOCALNODE_LENGTH 	4096
+#define MAX_FIELD_LENGTH		128
+
+#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+
+celix_status_t etcdCommon_init(bundle_context_pt context) {
+    celix_status_t status = CELIX_SUCCESS;
+    const char* etcd_server = NULL;
+    const char* etcd_port_string = NULL;
+    int etcd_port = 0;
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) {
+        etcd_server = DEFAULT_ETCD_SERVER_IP;
+    }
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) {
+        etcd_port = DEFAULT_ETCD_SERVER_PORT;
+    } else {
+        char* endptr = NULL;
+        errno = 0;
+        etcd_port = strtol(etcd_port_string, &endptr, 10);
+        if (*endptr || errno != 0) {
+            etcd_port = DEFAULT_ETCD_SERVER_PORT;
+        }
+    }
+
+    printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
+
+    if (etcd_init(etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        status = CELIX_SUCCESS;
+    }
+
+    return status;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_discovery/src/etcd_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_discovery/src/etcd_common.h b/bundles/pubsub/pubsub_discovery/src/etcd_common.h
new file mode 100644
index 0000000..7a3e7b6
--- /dev/null
+++ b/bundles/pubsub/pubsub_discovery/src/etcd_common.h
@@ -0,0 +1,28 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_COMMON_H_
+#define ETCD_COMMON_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+celix_status_t etcdCommon_init(bundle_context_pt context);
+
+#endif /* ETCD_COMMON_H_ */


Mime
View raw message