celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [3/3] celix git commit: CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.
Date Tue, 06 Nov 2018 11:44:45 GMT
CELIX-454: Refactors PubSub API. Multipart is no longer part of the current API.

Multipart support can come back, but then in seperated service (i.e. pubsub_multipart_subscriber / pubsub_multipart_publisher
Also
 - remove multipart examples
 - Add optional init function to the subscriber api, making it possible for subscriber to tweak the receiver thread
 - Initial support for static zmq bind,discover and connect urls to use for (among others) testing


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b2548c84
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b2548c84
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b2548c84

Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: b2548c84502e389b4e1565b9895e16459e9ef886
Parents: 68f69f8
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
Authored: Tue Nov 6 12:41:44 2018 +0100
Committer: Pepijn Noltes <pepijnnoltes@gmail.com>
Committed: Tue Nov 6 12:41:44 2018 +0100

----------------------------------------------------------------------
 bundles/pubsub/examples/CMakeLists.txt          |   40 -
 .../pubsub/examples/mp_pubsub/CMakeLists.txt    |   23 -
 .../examples/mp_pubsub/common/include/ew.h      |   53 -
 .../examples/mp_pubsub/common/include/ide.h     |   49 -
 .../mp_pubsub/common/include/kinematics.h       |   55 -
 .../mp_pubsub/msg_descriptors/msg_ew.descriptor |    9 -
 .../msg_descriptors/msg_ide.descriptor          |    9 -
 .../msg_descriptors/msg_kinematics.descriptor   |   10 -
 .../examples/mp_pubsub/publisher/CMakeLists.txt |   43 -
 .../private/include/mp_publisher_private.h      |   58 -
 .../publisher/private/src/mp_pub_activator.c    |  144 --
 .../publisher/private/src/mp_publisher.c        |  160 --
 .../mp_pubsub/subscriber/CMakeLists.txt         |   43 -
 .../private/include/mp_subscriber_private.h     |   50 -
 .../subscriber/private/src/mp_sub_activator.c   |  108 --
 .../subscriber/private/src/mp_subscriber.c      |  119 --
 .../private/include/pubsub_subscriber_private.h |    2 +-
 .../subscriber/private/src/pubsub_subscriber.c  |    2 +-
 bundles/pubsub/mock/src/publisher_mock.cc       |   14 -
 .../pubsub_admin_udp_mc/src/psa_activator.c     |    6 +-
 .../src/pubsub_udpmc_admin.c                    |   12 +-
 .../src/pubsub_udpmc_admin.h                    |    8 +-
 .../src/pubsub_udpmc_topic_receiver.c           |    7 +-
 .../src/pubsub_udpmc_topic_sender.c             |   10 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c |    6 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.c     |   38 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_admin.h     |   36 +-
 .../src/pubsub_zmq_topic_receiver.c             |  121 +-
 .../src/pubsub_zmq_topic_receiver.h             |    1 +
 .../src/pubsub_zmq_topic_sender.c               |   49 +-
 .../src/pubsub_zmq_topic_sender.h               |    1 +
 .../pubsub_api/include/pubsub/publisher.h       |   21 +-
 .../pubsub_api/include/pubsub/subscriber.h      |   20 +-
 .../pubsub/pubsub_discovery/src/psd_activator.c |    2 +-
 .../src/pubsub_discovery_impl.c                 |    2 +-
 .../src/pubsub_discovery_impl.h                 |    2 +-
 .../pubsub/pubsub_spi/include/pubsub_admin.h    |   18 +-
 .../pubsub_spi/include/pubsub_listeners.h       |    7 +-
 .../pubsub/pubsub_spi/include/pubsub_utils.h    |    2 +
 .../pubsub/pubsub_spi/src/pubsub_utils_match.c  |   12 +-
 .../src/pubsub_topology_manager.c               | 1407 ++++++++++--------
 .../src/pubsub_topology_manager.h               |    2 +-
 libs/utils/src/properties.c                     |    5 +-
 43 files changed, 1009 insertions(+), 1777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/CMakeLists.txt b/bundles/pubsub/examples/CMakeLists.txt
index e8113b9..2e43e6d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -16,7 +16,6 @@
 # under the License.
 
 add_subdirectory(pubsub)
-add_subdirectory(mp_pubsub)
 
 find_program(ETCD_CMD NAMES etcd)
 find_program(XTERM_CMD NAMES xterm)
@@ -198,33 +197,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     )
     target_link_libraries(pubsub_subscriber2_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
-    # ZMQ Multipart
-    add_celix_container("pubsub_mp_subscriber_zmq"
-            GROUP "pubsub"
-            BUNDLES
-            Celix::shell
-            Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
-            org.apache.celix.pubsub_subscriber.MpSubscriber
-            )
-    target_link_libraries(pubsub_mp_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
-    add_celix_container("pubsub_mp_publisher_zmq"
-            GROUP "pubsub"
-            BUNDLES
-            Celix::shell
-            Celix::shell_tui
-            Celix::pubsub_serializer_json
-            Celix::pubsub_discovery_etcd
-            Celix::pubsub_topology_manager
-            Celix::pubsub_admin_zmq
-            org.apache.celix.pubsub_publisher.MpPublisher
-            )
-    target_link_libraries(pubsub_mp_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
-
     if (ETCD_CMD AND XTERM_CMD)
         #Runtime starting two bundles using both zmq and upd mc pubsub
         add_celix_runtime(pubsub_rt_zmq_udpmc_combi
@@ -251,18 +223,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                 etcd
             USE_TERM
         )
-
-        #Runtime starting a multipart (multiple message in one send) publish and subscriber for zmq
-        add_celix_runtime(pubsub_rt_multipart_zmq
-            NAME zmq_multipart
-            GROUP pubsub
-            CONTAINERS
-                pubsub_mp_subscriber_zmq
-                pubsub_mp_publisher_zmq
-            COMMANDS
-                etcd
-            USE_TERM
-        )
     endif ()
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
deleted file mode 100644
index c828832..0000000
--- a/bundles/pubsub/examples/mp_pubsub/CMakeLists.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-include_directories("common/include")
-
-add_subdirectory(publisher)
-add_subdirectory(subscriber)
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h b/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
deleted file mode 100644
index 81ca5f3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ew.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ew.h
- *
- *  \date       Jan 15, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef EW_H_
-#define EW_H_
-
-#define MIN_AREA	50.0F
-#define MAX_AREA	15000.0F
-
-#define MSG_EW_NAME	"ew" //Has to match the message name in the msg descriptor!
-
-typedef enum color{
-	GREEN,
-	BLUE,
-	RED,
-	BLACK,
-	WHITE,
-	LAST_COLOR
-} color_e;
-
-const char* color_tostring[] = {"GREEN","BLUE","RED","BLACK","WHITE"};
-
-struct ew_data{
-	double area;
-	color_e color;
-};
-
-typedef struct ew_data* ew_data_pt;
-
-#endif /* EW_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h b/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
deleted file mode 100644
index 2b9588d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/ide.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * ide.h
- *
- *  \date       Jan 15, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef IDE_H_
-#define IDE_H_
-
-#define MSG_IDE_NAME	"ide" //Has to match the message name in the msg descriptor!
-
-typedef enum shape{
-	SQUARE,
-	CIRCLE,
-	TRIANGLE,
-	RECTANGLE,
-	HEXAGON,
-	LAST_SHAPE
-} shape_e;
-
-const char* shape_tostring[] = {"SQUARE","CIRCLE","TRIANGLE","RECTANGLE","HEXAGON"};
-
-struct ide{
-	shape_e shape;
-};
-
-typedef struct ide* ide_pt;
-
-#endif /* IDE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h b/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
deleted file mode 100644
index 5601509..0000000
--- a/bundles/pubsub/examples/mp_pubsub/common/include/kinematics.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * kinematics.h
- *
- *  \date       Nov 12, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef KINEMATICS_H_
-#define KINEMATICS_H_
-
-#define MIN_LAT	-90.0F
-#define MAX_LAT	 90.0F
-#define MIN_LON	-180.0F
-#define MAX_LON	 180.0F
-
-#define MIN_OCCUR 1
-#define MAX_OCCUR 5
-
-#define MSG_KINEMATICS_NAME	"kinematics" //Has to match the message name in the msg descriptor!
-
-struct position{
-	double lat;
-	double lon;
-};
-
-typedef struct position position_t;
-
-struct kinematics{
-	position_t position;
-	int occurrences;
-};
-
-typedef struct kinematics* kinematics_pt;
-
-
-#endif /* KINEMATICS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
deleted file mode 100644
index 7eb8c29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ew
-version=1.0.0
-:annotations
-classname=org.example.Ew
-:types
-:message
-{Di area color}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
deleted file mode 100644
index f26286d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
+++ /dev/null
@@ -1,9 +0,0 @@
-:header
-type=message
-name=ide
-version=1.0.0
-:annotations
-classname=org.example.Ide
-:types
-:message
-{i shape}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor b/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
deleted file mode 100644
index 447b645..0000000
--- a/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
+++ /dev/null
@@ -1,10 +0,0 @@
-:header
-type=message
-name=kinematics
-version=1.0.0
-:annotations
-classname=org.example.Kinematics
-:types
-position={DD lat long}
-:message
-{lposition;N position occurrences}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
deleted file mode 100644
index 76c9bce..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_publisher.MpPublisher
-    SYMBOLIC_NAME "apache_celix_pubsub_mp_publisher"
-    VERSION "1.0.0"
-    SOURCES 
-    	private/src/mp_pub_activator.c
-    	private/src/mp_publisher.c
-)
-target_link_libraries(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_publisher.MpPublisher PRIVATE private/include)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
-	DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
-    DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_publisher.MpPublisher
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
-    DESTINATION "META-INF/keys/subscriber"
-)

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h b/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
deleted file mode 100644
index 9c227fd..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/include/mp_publisher_private.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher_private.h
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef MP_PUBLISHER_PRIVATE_H_
-#define MP_PUBLISHER_PRIVATE_H_
-
-#include <celixbool.h>
-
-#include "pubsub/publisher.h"
-
-struct pubsub_sender {
-	array_list_pt trackers;
-	const char *ident;
-	long bundleId;
-};
-
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
-	pubsub_publisher_pt service;
-	pubsub_sender_pt publisher;
-} *send_thread_struct_pt;
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId);
-
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
-
-void publisher_destroy(pubsub_sender_pt client);
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service);
-
-
-#endif /* MP_PUBLISHER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
deleted file mode 100644
index 0b6041d..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_pub_activator.c
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_pub_activator.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <sys/cdefs.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "constants.h"
-
-#include "mp_publisher_private.h"
-
-static const char * PUB_TOPICS[] = {
-		"multipart",
-		NULL
-};
-
-
-struct publisherActivator {
-	pubsub_sender_pt client;
-	array_list_pt trackerList;//List<service_tracker_pt>
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	struct publisherActivator * act = malloc(sizeof(*act));
-
-	const char* fwUUID = NULL;
-
-	bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID == NULL){
-		printf("MP_PUBLISHER: Cannot retrieve fwUUID.\n");
-		status = CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-
-	if (status == CELIX_SUCCESS){
-		bundle_pt bundle = NULL;
-		long bundleId = 0;
-		bundleContext_getBundle(context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
-
-		arrayList_create(&(act->trackerList));
-		act->client = publisher_create(act->trackerList,fwUUID,bundleId);
-		*userData = act;
-	} else {
-		free(act);
-	}
-
-	return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-
-	int i;
-
-	char filter[128];
-	for(i=0; PUB_TOPICS[i] != NULL; i++){
-		const char* topic = PUB_TOPICS[i];
-
-		bundle_pt bundle = NULL;
-		long bundleId = 0;
-		bundleContext_getBundle(context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
-
-		service_tracker_pt tracker = NULL;
-		memset(filter,0,128);
-
-		snprintf(filter, 128, "(&(%s=%s)(%s=%s))", (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC,topic);
-
-		service_tracker_customizer_pt customizer = NULL;
-
-		serviceTrackerCustomizer_create(act->client,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer);
-		serviceTracker_createWithFilter(context, filter, customizer, &tracker);
-
-		arrayList_add(act->trackerList,tracker);
-	}
-
-	publisher_start(act->client);
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_open(tracker);
-	}
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) {
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-	int i;
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_close(tracker);
-	}
-
-	publisher_stop(act->client);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt  __attribute__((unused)) context) {
-	struct publisherActivator * act = (struct publisherActivator *) userData;
-	int i;
-
-	for(i=0;i<arrayList_size(act->trackerList);i++){
-		service_tracker_pt tracker = arrayList_get(act->trackerList,i);
-		serviceTracker_destroy(tracker);
-	}
-
-	publisher_destroy(act->client);
-	arrayList_destroy(act->trackerList);
-
-	free(act);
-
-	return CELIX_SUCCESS;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c b/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
deleted file mode 100644
index f47be29..0000000
--- a/bundles/pubsub/examples/mp_pubsub/publisher/private/src/mp_publisher.c
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_publisher.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-
-#include "service_tracker.h"
-#include "celix_threads.h"
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_publisher_private.h"
-
-
-static bool stop=false;
-static celix_thread_t tid;
-
-static double randDouble(double min, double max){
-	double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
-	return ret;
-}
-
-static unsigned int randInt(unsigned int min, unsigned int max){
-	double scaled = ((double)random())/((double)RAND_MAX);
-	return (max - min +1)*scaled + min;
-}
-
-static void* send_thread(void* arg){
-
-	send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
-	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
-
-	unsigned int kin_msg_id = 0;
-	unsigned int ide_msg_id = 0;
-	unsigned int ew_msg_id = 0;
-
-	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
-		printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
-		return NULL;
-	}
-
-	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
-		printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
-		return NULL;
-	}
-
-	if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
-		printf("MP_PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
-		return NULL;
-	}
-
-	kinematics_pt kin_data = calloc(1,sizeof(*kin_data));
-	ide_pt ide_data = calloc(1,sizeof(*ide_data));
-	ew_data_pt ew_data = calloc(1,sizeof(*ew_data));
-
-	unsigned int counter = 1;
-
-	while(stop==false){
-		kin_data->position.lat = randDouble(MIN_LAT,MAX_LAT);
-		kin_data->position.lon = randDouble(MIN_LON,MAX_LON);
-		kin_data->occurrences = randInt(MIN_OCCUR,MAX_OCCUR);
-		publish_svc->sendMultipart(publish_svc->handle,kin_msg_id,kin_data, PUBSUB_PUBLISHER_FIRST_MSG);
-		printf("Track#%u kin_data: pos=[%f, %f] occurrences=%d\n",counter,kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
-
-		ide_data->shape = (shape_e)randInt(0,LAST_SHAPE-1);
-		publish_svc->sendMultipart(publish_svc->handle,ide_msg_id,ide_data, PUBSUB_PUBLISHER_PART_MSG);
-		printf("Track#%u ide_data: shape=%s\n",counter,shape_tostring[(int)ide_data->shape]);
-
-		ew_data->area = randDouble(MIN_AREA,MAX_AREA);
-		ew_data->color = (color_e)randInt(0,LAST_COLOR-1);
-		publish_svc->sendMultipart(publish_svc->handle,ew_msg_id,ew_data, PUBSUB_PUBLISHER_LAST_MSG);
-		printf("Track#%u ew_data: area=%f color=%s\n",counter,ew_data->area,color_tostring[(int)ew_data->color]);
-
-		printf("\n");
-		sleep(2);
-		counter++;
-	}
-
-	free(ew_data);
-	free(ide_data);
-	free(kin_data);
-	free(st_struct);
-
-	return NULL;
-
-}
-
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
-	pubsub_sender_pt publisher = malloc(sizeof(*publisher));
-
-	publisher->trackers = trackers;
-	publisher->ident = ident;
-	publisher->bundleId = bundleId;
-
-	return publisher;
-}
-
-void publisher_start(pubsub_sender_pt client) {
-	printf("MP_PUBLISHER: starting up...\n");
-}
-
-void publisher_stop(pubsub_sender_pt client) {
-	printf("MP_PUBLISHER: stopping...\n");
-}
-
-void publisher_destroy(pubsub_sender_pt client) {
-	client->trackers = NULL;
-	client->ident = NULL;
-	free(client);
-}
-
-celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){
-	pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service;
-	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-
-	printf("MP_PUBLISHER: new publish service exported (%s).\n",manager->ident);
-
-	send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
-	data->service = publish_svc;
-	data->publisher = manager;
-
-	celixThread_create(&tid,NULL,send_thread,(void*)data);
-	return CELIX_SUCCESS;
-}
-
-celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){
-	//publish_service_pt publish_svc = (publish_service_pt)service;
-	pubsub_sender_pt manager = (pubsub_sender_pt)handle;
-	printf("MP_PUBLISHER: publish service unexported (%s)!\n",manager->ident);
-	stop=true;
-	celixThread_join(tid,NULL);
-	return CELIX_SUCCESS;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
deleted file mode 100644
index 444764e..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-add_celix_bundle(org.apache.celix.pubsub_subscriber.MpSubscriber
-    SYMBOLIC_NAME "apache_celix_pubsub_mp_subscriber"
-    VERSION "1.0.0"
-    SOURCES 
-		private/src/mp_sub_activator.c
-		private/src/mp_subscriber.c
-)
-target_link_libraries(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE Celix::framework Celix::pubsub_api)
-target_include_directories(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE private/include)
-
-celix_bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
-    ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
-    ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
-    ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
-	DESTINATION "META-INF/descriptors"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
-		${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber
-    DESTINATION "META-INF/keys"
-)
-
-celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
-	${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher/public
-    DESTINATION "META-INF/keys/publisher"
-)

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h b/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
deleted file mode 100644
index 2d582b3..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/include/mp_subscriber_private.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber_private.h
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_
-#define PUBSUB_SUBSCRIBER_PRIVATE_H_
-
-
-#include <string.h>
-
-#include "celixbool.h"
-
-#include "pubsub/subscriber.h"
-
-struct pubsub_receiver {
-	char * name;
-};
-
-typedef struct pubsub_receiver* pubsub_receiver_pt;
-
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
-
-#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
deleted file mode 100644
index 591a395..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_sub_activator.c
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_sub_activator.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "pubsub/subscriber.h"
-#include "bundle_activator.h"
-
-#include "mp_subscriber_private.h"
-
-#define SUB_NAME "multipart"
-static const char * SUB_TOPICS[] = {
-		"multipart",
-		NULL
-};
-
-struct subscriberActivator {
-	array_list_pt registrationList; //List<service_registration_pt>
-	pubsub_subscriber_t* subsvc;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator));
-	*userData = act;
-	arrayList_create(&(act->registrationList));
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-	struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-
-	pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
-	pubsub_receiver_pt sub = subscriber_create(SUB_NAME);
-	subsvc->handle = sub;
-	subsvc->receive = pubsub_subscriber_recv;
-
-	act->subsvc = subsvc;
-
-	int i;
-	for(i=0; SUB_TOPICS[i] != NULL; i++){
-		const char* topic = SUB_TOPICS[i];
-
-		properties_pt props = properties_create();
-		properties_set(props, PUBSUB_SUBSCRIBER_TOPIC,topic);
-		service_registration_pt reg = NULL;
-		bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, subsvc, props, &reg);
-		arrayList_add(act->registrationList,reg);
-	}
-
-	subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
-	struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-	int i;
-	for(i=0; i<arrayList_size(act->registrationList);i++){
-		service_registration_pt reg = arrayList_get(act->registrationList,i);
-		serviceRegistration_unregister(reg);
-
-	}
-
-	subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-
-	struct subscriberActivator * act = (struct subscriberActivator *) userData;
-
-	act->subsvc->receive = NULL;
-	subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
-	act->subsvc->handle = NULL;
-	free(act->subsvc);
-	act->subsvc = NULL;
-
-	arrayList_destroy(act->registrationList);
-	free(act);
-
-	return CELIX_SUCCESS;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c b/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
deleted file mode 100644
index a5ad03a..0000000
--- a/bundles/pubsub/examples/mp_pubsub/subscriber/private/src/mp_subscriber.c
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * mp_subscriber.c
- *
- *  \date       Sep 21, 2010
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "ew.h"
-#include "ide.h"
-#include "kinematics.h"
-#include "mp_subscriber_private.h"
-
-pubsub_receiver_pt subscriber_create(char* topics) {
-	pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
-	sub->name = strdup(topics);
-	return sub;
-}
-
-
-void subscriber_start(pubsub_receiver_pt subscriber){
-	printf("MP_SUBSCRIBER: starting up...\n");
-}
-
-void subscriber_stop(pubsub_receiver_pt subscriber){
-	printf("MP_SUBSCRIBER: stopping...\n");
-}
-
-void subscriber_destroy(pubsub_receiver_pt subscriber){
-	if(subscriber->name!=NULL){
-		free(subscriber->name);
-	}
-	subscriber->name=NULL;
-	free(subscriber);
-}
-
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
-
-	unsigned int kin_msg_id = 0;
-	unsigned int ide_msg_id = 0;
-	unsigned int ew_msg_id = 0;
-
-	if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_KINEMATICS_NAME,&kin_msg_id) != 0 ){
-		printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_KINEMATICS_NAME);
-		return -1;
-	}
-
-	if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_IDE_NAME,&ide_msg_id) != 0 ){
-		printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_IDE_NAME);
-		return -1;
-	}
-
-	if( callbacks->localMsgTypeIdForMsgType(callbacks->handle,(const char*)MSG_EW_NAME,&ew_msg_id) != 0 ){
-		printf("MP_SUBSCRIBER: Cannot retrieve msgId for message '%s'\n",MSG_EW_NAME);
-		return -1;
-	}
-
-	if(msgTypeId!=kin_msg_id){
-		printf("MP_SUBSCRIBER: Multipart Message started with wrong message (expected %u, got %u)\n",msgTypeId,kin_msg_id);
-		return -1;
-	}
-
-	kinematics_pt kin_data = (kinematics_pt)msg;
-
-	void* ide_msg = NULL;
-	callbacks->getMultipart(callbacks->handle,ide_msg_id,false,&ide_msg);
-
-	void* ew_msg = NULL;
-	callbacks->getMultipart(callbacks->handle,ew_msg_id,false,&ew_msg);
-
-	if(kin_data==NULL){
-		printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_KINEMATICS_NAME);
-	}
-	else{
-		printf("kin_data: pos=[%f, %f] occurrences=%d\n",kin_data->position.lat,kin_data->position.lon, kin_data->occurrences);
-	}
-
-	if(ide_msg==NULL){
-		printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_IDE_NAME);
-	}
-	else{
-		ide_pt ide_data = (ide_pt)ide_msg;
-		printf("ide_data: shape=%s\n",shape_tostring[(int)ide_data->shape]);
-	}
-
-	if(ew_msg==NULL){
-		printf("MP_SUBSCRIBER: Unexpected NULL data for message '%s'\n",MSG_EW_NAME);
-	}
-	else{
-		ew_data_pt ew_data = (ew_data_pt)ew_msg;
-		printf("ew_data: area=%f color=%s\n",ew_data->area,color_tostring[(int)ew_data->color]);
-	}
-
-	printf("\n");
-
-	return 0;
-
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 00ca9b4..291a6aa 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -45,7 +45,7 @@ void subscriber_start(pubsub_receiver_pt client);
 void subscriber_stop(pubsub_receiver_pt client);
 void subscriber_destroy(pubsub_receiver_pt client);
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release);
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
 
 
 #endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index a137253..7cfbedb 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -53,7 +53,7 @@ void subscriber_destroy(pubsub_receiver_pt subscriber){
 	free(subscriber);
 }
 
-int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release){
 
 	location_t place = (location_t)msg;
 	int nrchars = 25;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/mock/src/publisher_mock.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/mock/src/publisher_mock.cc b/bundles/pubsub/mock/src/publisher_mock.cc
index e8902a8..1bdae98 100644
--- a/bundles/pubsub/mock/src/publisher_mock.cc
+++ b/bundles/pubsub/mock/src/publisher_mock.cc
@@ -45,24 +45,10 @@ static int pubsub__publisherMock_send(void *handle, unsigned int msgTypeId, cons
 }
 
 /*============================================================================
-  MOCK - mock function for pubsub_publisher->sendMultipart
-  ============================================================================*/
-static int pubsub__publisherMock_sendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags) {
-    return mock(PUBSUB_PUBLISHERMOCK_SCOPE)
-        .actualCall(PUBSUB_PUBLISHERMOCK_SEND_MULTIPART_METHOD)
-        .withPointerParameter("handle", handle)
-        .withParameter("msgTypeId", msgTypeId)
-        .withPointerParameter("msg", (void*)msg)
-        .withParameter("flags", flags)
-        .returnIntValue();
-}
-
-/*============================================================================
   MOCK - mock setup for publisher service
   ============================================================================*/
 void pubsub_publisherMock_init(pubsub_publisher_t* srv, void* handle) {
     srv->handle = handle;
     srv->localMsgTypeIdForMsgType = pubsub__publisherMock_localMsgTypeIdForMsgType;
     srv->send = pubsub__publisherMock_send;
-    srv->sendMultipart = pubsub__publisherMock_sendMultipart;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
index c838899..63de809 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -71,13 +71,13 @@ int psa_udpmc_start(psa_udpmc_activator_t *act, celix_bundle_context_t *ctx) {
 		psaSvc->handle = act->admin;
 		psaSvc->matchPublisher = pubsub_udpmcAdmin_matchPublisher;
 		psaSvc->matchSubscriber = pubsub_udpmcAdmin_matchSubscriber;
-		psaSvc->matchEndpoint = pubsub_udpmcAdmin_matchEndpoint;
+		psaSvc->matchDiscoveredEndpoint = pubsub_udpmcAdmin_matchEndpoint;
 		psaSvc->setupTopicSender = pubsub_udpmcAdmin_setupTopicSender;
 		psaSvc->teardownTopicSender = pubsub_udpmcAdmin_teardownTopicSender;
 		psaSvc->setupTopicReceiver = pubsub_udpmcAdmin_setupTopicReceiver;
 		psaSvc->teardownTopicReceiver = pubsub_udpmcAdmin_teardownTopicReceiver;
-		psaSvc->addEndpoint = pubsub_udpmcAdmin_addEndpoint;
-		psaSvc->removeEndpoint = pubsub_udpmcAdmin_removeEndpoint;
+		psaSvc->addDiscoveredEndpoint = pubsub_udpmcAdmin_addEndpoint;
+		psaSvc->removeDiscoveredEndpoint = pubsub_udpmcAdmin_removeEndpoint;
 
 		celix_properties_t *props = celix_properties_create();
 		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_UDPMC_ADMIN_TYPE);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 6d06ee6..73fab20 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -244,23 +244,23 @@ void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa) {
     free(psa);
 }
 
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_UDPMC_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_udpmc_admin_t *psa = handle;
     L_DEBUG("[PSA_UDPMC] pubsub_udpmcAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_UDPMC_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
@@ -278,7 +278,7 @@ celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_propert
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -358,7 +358,7 @@ celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *s
     return status;
 }
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProps, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_udpmc_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index 02ebd44..17b8957 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@ -40,14 +40,14 @@ typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
 pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_udpmcAdmin_destroy(pubsub_udpmc_admin_t *psa);
 
-celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_udpmcAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
 celix_status_t pubsub_udpmcAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_udpmcAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
 void pubsub_udpmcAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index efebf7c..a2f37d4 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -396,13 +396,8 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
 
                 if (status == CELIX_SUCCESS) {
                     bool release = true;
-                    pubsub_multipart_callbacks_t mp_callbacks;
-                    mp_callbacks.handle = receiver;
-                    mp_callbacks.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
-                    mp_callbacks.getMultipart = NULL;
-
                     pubsub_subscriber_t *svc = entry->svc;
-                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+                    svc->receive(svc->handle, msgSer->msgName, msg->header.type, msgInst, &release);
 
                     if(release){
                         msgSer->freeMsg(msgSer,msgInst);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 59c809c..ddc6172 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -79,7 +79,7 @@ typedef struct pubsub_msg{
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback);
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg);
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 pubsub_udpmc_topic_sender_t* pubsub_udpmcTopicSender_create(
@@ -205,7 +205,6 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
             entry->service.send = psa_udpmc_topicPublicationSend;
-            entry->service.sendMultipart = NULL; //note multipart not supported by MCUDP
             hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
             svc = &entry->service;
         } else {
@@ -277,7 +276,7 @@ static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId,
             msg->payloadSize = serializedOutputLen;
 
 
-            if (psa_udpmc_sendMsg(entry, msg, true, NULL) == false) {
+            if (psa_udpmc_sendMsg(entry, msg) == false) {
                 status = -1;
             }
             free(msg);
@@ -306,7 +305,7 @@ static void delay_first_send_for_late_joiners(){
     }
 }
 
-static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback) {
+static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_msg_t* msg) {
     const int iovec_len = 3; // header + size + payload
     bool ret = true;
 
@@ -325,9 +324,6 @@ static bool psa_udpmc_sendMsg(psa_udpmc_bounded_service_entry_t *entry, pubsub_m
         ret = false;
     }
 
-    if(releaseCallback) {
-        releaseCallback->release(msg->payload, entry);
-    }
     return ret;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
index 01547cc..58f093a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -70,13 +70,13 @@ int psa_zmq_start(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
 		psaSvc->handle = act->admin;
 		psaSvc->matchPublisher = pubsub_zmqAdmin_matchPublisher;
 		psaSvc->matchSubscriber = pubsub_zmqAdmin_matchSubscriber;
-		psaSvc->matchEndpoint = pubsub_zmqAdmin_matchEndpoint;
+		psaSvc->matchDiscoveredEndpoint = pubsub_zmqAdmin_matchDiscoveredEndpoint;
 		psaSvc->setupTopicSender = pubsub_zmqAdmin_setupTopicSender;
 		psaSvc->teardownTopicSender = pubsub_zmqAdmin_teardownTopicSender;
 		psaSvc->setupTopicReceiver = pubsub_zmqAdmin_setupTopicReceiver;
 		psaSvc->teardownTopicReceiver = pubsub_zmqAdmin_teardownTopicReceiver;
-		psaSvc->addEndpoint = pubsub_zmqAdmin_addEndpoint;
-		psaSvc->removeEndpoint = pubsub_zmqAdmin_removeEndpoint;
+		psaSvc->addDiscoveredEndpoint = pubsub_zmqAdmin_addDiscoveredEndpoint;
+		psaSvc->removeDiscoveredEndpoint = pubsub_zmqAdmin_removeDiscoveredEndpoint;
 
 		celix_properties_t *props = celix_properties_create();
 		celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index a13cb26..0192bb4 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -308,30 +308,30 @@ void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_pr
     celixThreadMutex_unlock(&psa->serializers.mutex);
 }
 
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchPublisher");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchPublisher(psa->ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_ZMQ_ADMIN_TYPE,
-                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+                                                psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     *outScore = score;
 
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, long *outSerializerSvcId) {
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchSubscriber");
     celix_status_t  status = CELIX_SUCCESS;
     double score = pubsub_utils_matchSubscriber(psa->ctx, svcProviderBndId, svcProperties, PUBSUB_ZMQ_ADMIN_TYPE,
-            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, outSerializerSvcId);
+            psa->qosSampleScore, psa->qosControlScore, psa->defaultScore, topicProperties, outSerializerSvcId);
     if (outScore != NULL) {
         *outScore = score;
     }
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *outMatch) {
     pubsub_zmq_admin_t *psa = handle;
     L_DEBUG("[PSA_ZMQ] pubsub_zmqAdmin_matchEndpoint");
     celix_status_t  status = CELIX_SUCCESS;
@@ -342,7 +342,7 @@ celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_propertie
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outPublisherEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
@@ -353,6 +353,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
 
     celix_properties_t *newEndpoint = NULL;
 
+    const char * staticBindUrl = topicProperties != NULL ?
+            celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_BIND_URL, NULL) : NULL;
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
     celixThreadMutex_lock(&psa->serializers.mutex);
@@ -361,8 +363,8 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
     if (sender == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc, psa->ipAddress,
-                                                  psa->basePort, psa->maxPort);
+            sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc,
+                    psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
         }
         if (sender != NULL) {
             const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
@@ -370,6 +372,15 @@ celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope,
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
                                                 serType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
+
+            //if configured use a static discover url
+            const char *staticDiscUrl = topicProperties != NULL ?
+                    celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL) : NULL;
+            if (staticDiscUrl != NULL) {
+                celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl);
+            }
+            celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL) {
@@ -423,11 +434,14 @@ celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *sco
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **outSubscriberEndpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     celix_properties_t *newEndpoint = NULL;
 
+    const char *staticConnectUrls = topicProperties != NULL ?
+            celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL) : NULL;
+
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -435,7 +449,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop
     if (receiver == NULL) {
         psa_zmq_serializer_entry_t *serEntry = hashMap_get(psa->serializers.map, (void*)serializerSvcId);
         if (serEntry != NULL) {
-            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, serializerSvcId, serEntry->svc);
+            receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, staticConnectUrls, serializerSvcId, serEntry->svc);
         } else {
             L_ERROR("[PSA_ZMQ] Cannot find serializer for TopicSender %s/%s", scope, topic);
         }
@@ -530,7 +544,7 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -581,7 +595,7 @@ static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_
     return status;
 }
 
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint) {
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) {
     pubsub_zmq_admin_t *psa = handle;
 
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
index 180a9db..304ad11 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h
@@ -35,23 +35,45 @@
 
 #define PUBSUB_ZMQ_DEFAULT_IP       "127.0.0.1"
 
+/**
+ * Can be set in the topic properties to fix a static bind url
+ */
+#define PUBSUB_ZMQ_STATIC_BIND_URL       "zmq.static.bind.url"
+
+/**
+ * Can be set in the topic properties to fix a static url used for discovery
+ */
+#define PUBSUB_ZMQ_STATIC_DISCOVER_URL       "zmq.static.bind.url"
+
+/**
+ * If set true on the endpoint, the zmq TopicSender bind and/or discovery url is statically configured.
+ */
+#define PUBSUB_ZMQ_STATIC_CONFIGURED       "zmq.static.configured"
+
+/**
+ * The static url which a subscriber should try to connect to.
+ * The urls are space separated
+ */
+#define PUBSUB_ZMQ_STATIC_CONNECT_URLS    "zmq.static.connect.url"
+
+
 typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
 
 pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper);
 void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
 
-celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId);
-celix_status_t pubsub_zmqAdmin_matchEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
+celix_status_t pubsub_zmqAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchSubscriber(void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **topicProperties, double *score, long *serializerSvcId);
+celix_status_t pubsub_zmqAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint, bool *match);
 
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint);
 celix_status_t pubsub_zmqAdmin_teardownTopicSender(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint);
+celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t* topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint);
 celix_status_t pubsub_zmqAdmin_teardownTopicReceiver(void *handle, const char *scope, const char *topic);
 
-celix_status_t pubsub_zmqAdmin_addEndpoint(void *handle, const celix_properties_t *endpoint);
-celix_status_t pubsub_zmqAdmin_removeEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
+celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
 
 void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
 void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index b348fdc..bb190cc 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -66,38 +66,46 @@ struct pubsub_zmq_topic_receiver {
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t*
+        bool allConnected; //true if all requestedConnectection are connected
     } requestedConnections;
 
     long subscriberTrackerId;
     struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t
+        bool allInitialized;
     } subscribers;
 };
 
 typedef struct psa_zmq_requested_connection_entry {
     char *url;
     bool connected;
+    bool statically; //true if the connection is statically configured through the topic properties.
 } psa_zmq_requested_connection_entry_t;
 
 typedef struct psa_zmq_subscriber_entry {
     int usageCount;
     hash_map_t *msgTypes; //map from serializer svc
     pubsub_subscriber_t *svc;
+    bool initialized; //true if the init function is called through the receive thread
 } psa_zmq_subscriber_entry_t;
 
 
 static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void* psa_zmq_recvThread(void * data);
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver);
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
+
 
 
 pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
                                                               log_helper_t *logHelper,
                                                               const char *scope,
-                                                                const char *topic,
-                                                                long serializerSvcId,
-                                                                pubsub_serializer_service_t *serializer) {
+                                                              const char *topic,
+                                                              const char *staticConnectUrls,
+                                                              long serializerSvcId,
+                                                              pubsub_serializer_service_t *serializer) {
     pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
@@ -185,6 +193,22 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
     }
 
+    if (receiver->zmqSocket != NULL && staticConnectUrls != NULL) {
+        char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
+        char* url;
+        char* save = urlsCopy;
+
+        while ((url = strtok_r(save, " ", &save))) {
+            psa_zmq_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
+            entry->statically = true;
+            entry->connected = false;
+            entry->url = strndup(url, 1024*1024);
+            hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+            receiver->requestedConnections.allConnected = false;
+        }
+        free(urlsCopy);
+    }
+
     //track subscribers
     if (receiver->zmqSocket != NULL ) {
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
@@ -280,10 +304,12 @@ void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiv
     hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
     while (hashMapIterator_hasNext(&iter)) {
         psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+        char *url = NULL;
+        asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
         if (entry->connected) {
-            celix_arrayList_add(connectedUrls, strndup(entry->url, 1024));
+            celix_arrayList_add(connectedUrls, url);
         } else {
-            celix_arrayList_add(unconnectedUrls, strndup(entry->url, 1024));
+            celix_arrayList_add(unconnectedUrls, url);
         }
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
@@ -301,16 +327,13 @@ void pubsub_zmqTopicReceiver_connectTo(
         entry = calloc(1, sizeof(*entry));
         entry->url = strndup(url, 1024*1024);
         entry->connected = false;
+        entry->statically = false;
         hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
-    }
-    if (!entry->connected) {
-        if (zsock_connect(receiver->zmqSocket, "%s", url) == 0) {
-            entry->connected = true;
-        } else {
-            L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", url, strerror(errno));
-        }
+        receiver->requestedConnections.allConnected = false;
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    psa_zmq_connectToAllRequestedConnections(receiver);
 }
 
 void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) {
@@ -351,6 +374,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const
         entry = calloc(1, sizeof(*entry));
         entry->usageCount = 1;
         entry->svc = svc;
+        entry->initialized = false;
 
         int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
@@ -396,7 +420,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
             celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg);
             if(status == CELIX_SUCCESS) {
                 bool release = true;
-                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release);
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release);
                 if (release) {
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
@@ -428,7 +452,23 @@ static void* psa_zmq_recvThread(void * data) {
     bool running = receiver->recvThread.running;
     celixThreadMutex_unlock(&receiver->recvThread.mutex);
 
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    bool allConnected = receiver->requestedConnections.allConnected;
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    bool allInitialized = receiver->subscribers.allInitialized;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+
     while (running) {
+        if (!allConnected) {
+            psa_zmq_connectToAllRequestedConnections(receiver);
+        }
+        if (!allInitialized) {
+            psa_zmq_initializeAllSubscribers(receiver);
+        }
+
         zmsg_t *zmsg = zmsg_recv(receiver->zmqSocket);
         if (zmsg != NULL) {
             if (zmsg_size(zmsg) != 3) {
@@ -458,7 +498,62 @@ static void* psa_zmq_recvThread(void * data) {
         celixThreadMutex_lock(&receiver->recvThread.mutex);
         running = receiver->recvThread.running;
         celixThreadMutex_unlock(&receiver->recvThread.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        allConnected = receiver->requestedConnections.allConnected;
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        allInitialized = receiver->subscribers.allInitialized;
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
     } // while
 
     return NULL;
 }
+
+
+static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (!receiver->requestedConnections.allConnected) {
+        bool allConnected = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->connected){
+                if (zsock_connect(receiver->zmqSocket, "%s", entry->url) == 0) {
+                    entry->connected = true;
+                } else {
+                    L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno));
+                    allConnected = false;
+                }
+            }
+        }
+        receiver->requestedConnections.allConnected = allConnected;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+}
+
+static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver) {
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    if (!receiver->subscribers.allInitialized) {
+        bool allInitialized = true;
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+            if (!entry->initialized) {
+                int rc = 0;
+                if (entry->svc != NULL && entry->svc->init != NULL) {
+                    rc = entry->svc->init(entry->svc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
+                }
+            }
+        }
+        receiver->subscribers.allInitialized = allInitialized;
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
index 9049153..d093bfd 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.h
@@ -27,6 +27,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context
         log_helper_t *logHelper,
         const char *scope, 
         const char *topic,
+        const char *staticConnectUrls,
         long serializerSvcId,
         pubsub_serializer_service_t *serializer);
 void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index f6221a2..9ad7783 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -95,6 +95,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
         const char *bindIP,
+        const char *staticBindUrl,
         unsigned int basePort,
         unsigned int maxPort) {
     pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
@@ -172,29 +173,38 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
 	    }
 #endif
 
-        int rv = -1, retry=0;
-        while(rv==-1 && retry < ZMQ_BIND_MAX_RETRY ) {
-            /* Randomized part due to same bundle publishing on different topics */
-            unsigned int port = rand_range(basePort,maxPort);
+        if (staticBindUrl != NULL) {
+            int rv = zsock_bind (socket, "%s", staticBindUrl);
+            if (rv == -1) {
+                L_WARN("Error for zmq_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno));
+            } else {
+                sender->url = strndup(staticBindUrl, 1024*1024);
+            }
+        } else {
 
-            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1;
-            char *url = calloc(len, sizeof(char*));
-            snprintf(url, len, "tcp://%s:%u", bindIP, port);
+            int retry = 0;
+            while (sender->url == NULL && retry < ZMQ_BIND_MAX_RETRY) {
+                /* Randomized part due to same bundle publishing on different topics */
+                unsigned int port = rand_range(basePort, maxPort);
 
-            len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
-            char *bindUrl = calloc(len, sizeof(char));
-            snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+                char *url = NULL;
+                asprintf(&url, "tcp://%s:%u", bindIP, port);
 
-            rv = zsock_bind (socket, "%s", bindUrl);
-            if (rv == -1) {
-                perror("Error for zmq_bind");
-                free(url);
-            } else {
-                sender->url = url;
-                sender->zmq.socket = socket;
+                char *bindUrl = NULL;
+                asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
+
+
+                int rv = zsock_bind(socket, "%s", bindUrl);
+                if (rv == -1) {
+                    L_WARN("Error for zmq_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno));
+                    free(url);
+                } else {
+                    sender->url = url;
+                    sender->zmq.socket = socket;
+                }
+                retry++;
+                free(bindUrl);
             }
-            retry++;
-            free(bindUrl);
         }
     }
 
@@ -305,7 +315,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
             entry->service.send = psa_zmq_topicPublicationSend;
-            entry->service.sendMultipart = NULL; //not supported TODO remove
             hashMap_put(sender->boundedServices.map, (void*)bndId, entry);
         } else {
             L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);

http://git-wip-us.apache.org/repos/asf/celix/blob/b2548c84/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
index e537edd..af63870 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h
@@ -31,6 +31,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create(
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
         const char *bindIP,
+        const char *staticBindUrl,
         unsigned int basePort,
         unsigned int maxPort);
 void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);


Mime
View raw message