celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [celix] 01/02: Refactors usage locking in pstm so that recursive lock are not needed.
Date Tue, 30 Jun 2020 16:31:12 GMT
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch bugfix/pstm_deadlock_work_outside_of_locks
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 2411b40ae226df154d1bb01106d462dfe3c31bf1
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
AuthorDate: Tue Jun 30 16:03:35 2020 +0200

    Refactors usage locking in pstm so that recursive lock are not needed.
---
 .../src/pubsub_topology_manager.c                  | 456 +++++++++++++--------
 .../src/pubsub_topology_manager.h                  |  14 +-
 2 files changed, 300 insertions(+), 170 deletions(-)

diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index da1ebbf..dfdda22 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -60,22 +60,11 @@ celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, ce
 
     manager->context = context;
 
-    celix_thread_mutexattr_t psaAttr;
-    celixThreadMutexAttr_create(&psaAttr);
-    celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-    status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr);
-    celixThreadMutexAttr_destroy(&psaAttr);
-
+    status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, NULL);
     status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL);
     status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
     status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
-
-    celix_thread_mutexattr_t attr;
-    status |= celixThreadMutexAttr_create(&attr);
-    status |= celixThreadMutexAttr_settype(&attr, CELIX_THREAD_MUTEX_RECURSIVE);
-    status |= celixThreadMutex_create(&manager->topicSenders.mutex, &attr);
-    status |= celixThreadMutexAttr_destroy(&attr);
-
+    status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
     status |= celixThreadMutex_create(&manager->psaMetrics.mutex, NULL);
     status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
 
@@ -190,7 +179,8 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
 
 
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
-    celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "PSTM: Added PSA");
+    const char* psaType = celix_properties_get(props, PUBSUB_ADMIN_SERVICE_TYPE, "!Error!");
+    celix_logHelper_debug(manager->loghelper, "Added %s PSA", psaType);
 
     if (svcId >= 0) {
         celixThreadMutex_lock(&manager->pubsubadmins.mutex);
@@ -200,14 +190,15 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
 
     //NOTE new psa, so no endpoints announce yet
 
-    //new PSA -> every topic sender/receiver entry needs a rematch
+    //Because this is a new PSA every topic sender/receiver entry needs a rematch.
+    //This is needed because the new PSA can be a better match than currently used.
     int needsRematchCount = 0;
 
     celixThreadMutex_lock(&manager->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        entry->needsMatch = true;
+        entry->matching.needsMatch = true;
         ++needsRematchCount;
     }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -215,14 +206,14 @@ void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_proper
     iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        entry->needsMatch = true;
+        entry->matching.needsMatch = true;
         ++needsRematchCount;
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
     if (needsRematchCount > 0) {
-        celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_INFO,
-                      "A PSA is added after at least one active publisher/provided. \
+        celix_logHelper_info(manager->loghelper,
+                      "A new PSA is added after at least one active publisher/provided. \
                 It is preferred that all PSA are started before publiser/subscriber are started!\n\
                 Current topic/sender count is %i", needsRematchCount);
     }
@@ -233,6 +224,8 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
     pubsub_topology_manager_t *manager = handle;
     //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L);
+    const char* psaType = celix_properties_get(props, PUBSUB_ADMIN_SERVICE_TYPE, "!Error!");
+    celix_logHelper_debug(manager->loghelper, "Removing %s PSA", psaType);
 
     // Remove the svcId from the hashmap, because the service is not available
     celixThreadMutex_lock(&manager->pubsubadmins.mutex);
@@ -254,30 +247,21 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
     //de-setup all topic receivers/senders for the removed psa.
     //the next psaHandling run will try to find new psa.
 
+    celix_array_list_t* revokedEndpoints = celix_arrayList_create();
     celixThreadMutex_lock(&manager->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry->selectedPsaSvcId == svcId) {
-            /* de-announce all senders */
+        if (entry->matching.selectedPsaSvcId == svcId) {
             if (entry->endpoint != NULL) {
-                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-                for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
-                    pubsub_announce_endpoint_listener_t *listener;
-                    listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
-                    listener->revokeEndpoint(listener->handle, entry->endpoint);
-                }
-                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                celix_arrayList_add(revokedEndpoints, entry->endpoint);
             }
 
-            entry->needsMatch = true;
-            entry->selectedSerializerSvcId = -1L;
-            entry->selectedProtocolSvcId = -1L;
-            entry->selectedPsaSvcId = -1L;
-            if (entry->endpoint != NULL) {
-                celix_properties_destroy(entry->endpoint);
-                entry->endpoint = NULL;
-            }
+            entry->matching.needsMatch = true;
+            entry->matching.selectedSerializerSvcId = -1L;
+            entry->matching.selectedProtocolSvcId = -1L;
+            entry->matching.selectedPsaSvcId = -1L;
+            entry->endpoint = NULL;
         }
     }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -286,32 +270,33 @@ void pubsub_topologyManager_psaRemoved(void *handle, void *svc __attribute__((un
     iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry->selectedPsaSvcId == svcId) {
-            /* de-announce all receivers */
+        if (entry->matching.selectedPsaSvcId == svcId) {
             if (entry->endpoint != NULL) {
-                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-                for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
-                    pubsub_announce_endpoint_listener_t *listener;
-                    listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
-                    listener->revokeEndpoint(listener->handle, entry->endpoint);
-                }
-                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                celix_arrayList_add(revokedEndpoints, entry->endpoint);
             }
 
-            entry->needsMatch = true;
-            entry->selectedSerializerSvcId = -1L;
-            entry->selectedProtocolSvcId = -1L;
-            entry->selectedPsaSvcId = -1L;
-            if (entry->endpoint != NULL) {
-                celix_properties_destroy(entry->endpoint);
-                entry->endpoint = NULL;
-            }
+            entry->matching.needsMatch = true;
+            entry->matching.selectedSerializerSvcId = -1L;
+            entry->matching.selectedProtocolSvcId = -1L;
+            entry->matching.selectedPsaSvcId = -1L;
+            entry->endpoint = NULL;
         }
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
-
-    celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "PSTM: Removed PSA");
+    /* de-announce all senders & receiver endpoints */
+    for (int i = 0; i < celix_arrayList_size(revokedEndpoints); ++i) {
+        celix_properties_t* endpoint = celix_arrayList_get(revokedEndpoints, i);
+        celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+        for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+            pubsub_announce_endpoint_listener_t *listener;
+            listener = celix_arrayList_get(manager->announceEndpointListeners.list, j);
+            listener->revokeEndpoint(listener->handle, endpoint);
+        }
+        celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+        celix_properties_destroy(endpoint);
+    }
+    celix_arrayList_destroy(revokedEndpoints);
 }
 
 void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) {
@@ -325,11 +310,12 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
     const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL);
     const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
     if (topic == NULL) {
-        celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_WARNING,
-                      "[PSTM] Warning found subscriber service without mandatory '%s' property.",
+        celix_logHelper_warning(manager->loghelper,
+                      "Warning found subscriber service without mandatory '%s' property.",
                       PUBSUB_SUBSCRIBER_TOPIC);
         return;
     }
+    celix_logHelper_trace(manager->loghelper, "Adding subscriber %s/%s", scope, topic);
 
     long bndId = celix_bundle_getId(bnd);
     char *scopeAndTopicKey = NULL;
@@ -346,12 +332,13 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_
         entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
         entry->topic = strndup(topic, 1024 * 1024);
         entry->usageCount = 1;
-        entry->selectedPsaSvcId = -1L;
-        entry->selectedSerializerSvcId = -1L;
-        entry->needsMatch = true;
+        entry->matching.selectedPsaSvcId = -1L;
+        entry->matching.selectedSerializerSvcId = -1L;
+        entry->matching.needsMatch = true;
         entry->bndId = bndId;
         entry->subscriberProperties = celix_properties_copy(props);
         hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry);
+        celix_logHelper_trace(manager->loghelper, "Created new topic receiver entry %s", entry->scopeAndTopicKey);
     }
     //signal psa handling thread
     bool triggerCondition = (entry->usageCount == 1);
@@ -376,6 +363,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut
     if (topic == NULL) {
         return;
     }
+    celix_logHelper_trace(manager->loghelper, "Removing subscriber %s/%s", scope, topic);
 
     char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
@@ -393,15 +381,22 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo
     pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
     pubsub_announce_endpoint_listener_t *listener = svc;
 
+    celix_logHelper_trace(manager->loghelper, "Added EndpointListener");
+
+
     //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints)
     //2) Add listener to manager->announceEndpointListeners
 
+    celix_array_list_t* announceEndpoints = celix_arrayList_create();
+
+    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+
     celixThreadMutex_lock(&manager->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL && entry->endpoint != NULL) {
-            listener->announceEndpoint(listener->handle, entry->endpoint);
+            celix_arrayList_add(announceEndpoints, celix_properties_copy(entry->endpoint));
         }
     }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
@@ -411,14 +406,20 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void *handle, vo
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
         if (entry != NULL && entry->endpoint != NULL) {
-            listener->announceEndpoint(listener->handle, entry->endpoint);
+            celix_arrayList_add(announceEndpoints, celix_properties_copy(entry->endpoint));
         }
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
 
-    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
     celix_arrayList_add(manager->announceEndpointListeners.list, listener);
     celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+    for (int i = 0; i < celix_arrayList_size(announceEndpoints); ++i) {
+        celix_properties_t* endpoint = celix_arrayList_get(announceEndpoints, i);
+        listener->announceEndpoint(listener->handle, endpoint);
+        celix_properties_destroy(endpoint);
+    }
+    celix_arrayList_destroy(announceEndpoints);
 }
 
 
@@ -426,8 +427,7 @@ void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void *handle,
     pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *) handle;
     pubsub_announce_endpoint_listener_t *listener = svc;
 
-    //1) Remove listener from manager->announceEndpointListeners
-    //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints)
+    celix_logHelper_trace(manager->loghelper, "Removing EndpointListener");
 
     celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
     celix_arrayList_remove(manager->announceEndpointListeners.list, listener);
@@ -456,6 +456,8 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
                       PUBSUB_SUBSCRIBER_TOPIC);
         return;
     }
+    celix_logHelper_trace(manager->loghelper, "Adding new request for publisher %s/%s", scope, topic);
+
 
     scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
     celixThreadMutex_lock(&manager->topicSenders.mutex);
@@ -470,16 +472,17 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv
     } else {
         entry = calloc(1, sizeof(*entry));
         entry->usageCount = 1;
-        entry->selectedSerializerSvcId = -1L;
-        entry->selectedProtocolSvcId = -1L;
-        entry->selectedPsaSvcId = -1L;
+        entry->matching.needsMatch = true;
+        entry->matching.selectedSerializerSvcId = -1L;
+        entry->matching.selectedProtocolSvcId = -1L;
+        entry->matching.selectedPsaSvcId = -1L;
         entry->scope = scope; //taking ownership
         entry->topic = topic; //taking ownership
         entry->scopeAndTopicKey = scopeAndTopicKey; //taking ownership
-        entry->needsMatch = true;
         entry->publisherFilter = celix_filter_create(info->filter->filterStr);
         entry->bndId = info->bundleId;
         hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry);
+        celix_logHelper_trace(manager->loghelper, "Created new topic sender entry %s", entry->scopeAndTopicKey);
     }
     //new entry -> wakeup psaHandling thread
     bool triggerCondition = (entry->usageCount == 1);
@@ -510,6 +513,7 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se
         }
         return;
     }
+    celix_logHelper_trace(manager->loghelper, "Removing request for publisher %s/%s", scope, topic);
 
 
     char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
@@ -540,8 +544,8 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
     bool triggerCondition = false;
 
     if (manager->verbose) {
-        celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
-                      "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+        celix_logHelper_trace(manager->loghelper,
+                      "Adding discovered endpoint for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
                       celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
                       celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"),
                       celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
@@ -562,6 +566,7 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
         entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
         entry->selectedPsaSvcId = -1L; //NOTE not selected a psa yet
         hashMap_put(manager->discoveredEndpoints.map, (void *) entry->uuid, entry);
+        celix_logHelper_trace(manager->loghelper, "Created new discovered endpoint entry %s", uuid);
 
         //waking up psa handling thread to select psa
         triggerCondition = true;
@@ -595,8 +600,8 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
     assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid.
 
     if (manager->verbose) {
-        celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
-                      "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+        celix_logHelper_trace(manager->loghelper,
+                      "Removing discovered endpoint for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
                       celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "(null)"),
                       celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"),
                       celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "(null)"),
@@ -625,6 +630,7 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
         } else {
             celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
         }
+        celix_logHelper_trace(manager->loghelper, "Destroying discovered endpoint entry %s", uuid);
         celix_properties_destroy(entry->endpoint);
         free(entry);
     }
@@ -633,39 +639,42 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
     return CELIX_SUCCESS;
 }
 
+struct pstm_teardown_entry {
+    char* scope;
+    char* topic;
+    long psaSvcId;
+};
 
 static void pstm_teardownTopicSenderCallback(void *handle, void *svc) {
-    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    struct pstm_teardown_entry* entry = handle;
     pubsub_admin_service_t *psa = svc;
     psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
 }
 
+//Note called on pstm update thread
 static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
+    celix_array_list_t* revokeEndpoints = celix_arrayList_create();
+    celix_array_list_t* teardownEntries = celix_arrayList_create();
+
     celixThreadMutex_lock(&manager->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
 
-        if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
+        if (entry != NULL && (entry->usageCount <= 0 || entry->matching.needsMatch)) {
             if (manager->verbose && entry->endpoint != NULL) {
                 celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
-                              "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
+                              "Tearing down TopicSender for scope/topic %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
             }
-
             if (entry->endpoint != NULL) {
-                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-                for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-                    pubsub_announce_endpoint_listener_t *listener;
-                    listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-                    listener->revokeEndpoint(listener->handle, entry->endpoint);
-                }
-                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-                celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
-                                                     PUBSUB_ADMIN_SERVICE_NAME,
-                                                     entry, pstm_teardownTopicSenderCallback);
+                celix_arrayList_add(revokeEndpoints, celix_properties_copy(entry->endpoint));
+                struct pstm_teardown_entry* teardownEntry = malloc(sizeof(*teardownEntry));
+                teardownEntry->scope = celix_utils_strdup(entry->scope);
+                teardownEntry->topic = celix_utils_strdup(entry->topic);
+                teardownEntry->psaSvcId = entry->matching.selectedPsaSvcId;
+                celix_arrayList_add(teardownEntries, teardownEntry);
             }
 
-
             //cleanup entry
             if (entry->usageCount <= 0) {
                 //no usage -> remove
@@ -691,46 +700,71 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
                     celix_properties_destroy(entry->endpoint);
                 }
                 entry->endpoint = NULL;
-                entry->selectedPsaSvcId = -1L;
-                entry->selectedSerializerSvcId = -1L;
-                entry->selectedProtocolSvcId = -1L;
+                entry->matching.selectedPsaSvcId = -1L;
+                entry->matching.selectedSerializerSvcId = -1L;
+                entry->matching.selectedProtocolSvcId = -1L;
             }
         }
     }
     celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+
+
+    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+    for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+        pubsub_announce_endpoint_listener_t *listener;
+        listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+        for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+            celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+            listener->revokeEndpoint(listener->handle, endpoint);
+            celix_properties_destroy(endpoint);
+        }
+    }
+    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+    celix_arrayList_destroy(revokeEndpoints);
+
+    for (int i = 0; i < celix_arrayList_size(teardownEntries); ++i) {
+        struct pstm_teardown_entry* entry = celix_arrayList_get(teardownEntries, i);
+        celix_bundleContext_useServiceWithId(manager->context, entry->psaSvcId,
+                                             PUBSUB_ADMIN_SERVICE_NAME,
+                                             entry, pstm_teardownTopicSenderCallback);
+        free(entry->scope);
+        free(entry->topic);
+        free(entry);
+    }
+    celix_arrayList_destroy(teardownEntries);
 }
 
 static void pstm_teardownTopicReceiverCallback(void *handle, void *svc) {
-    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    struct pstm_teardown_entry* entry = handle;
     pubsub_admin_service_t *psa = svc;
     psa->teardownTopicReceiver(psa->handle, entry->scope, entry->topic);
 }
 
 static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
+    celix_array_list_t* revokeEndpoints = celix_arrayList_create();
+    celix_array_list_t* teardownEntries = celix_arrayList_create();
+
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
+        if (entry != NULL && (entry->usageCount <= 0 || entry->matching.needsMatch)) {
             if (manager->verbose && entry->endpoint != NULL) {
                 const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
                 const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
                 celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG,
-                              "[PSTM] Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
+                              "Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
                               entry->scope == NULL ? "(null)" : entry->scope, entry->topic, adminType, serType);
             }
 
             if (entry->endpoint != NULL) {
-                celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId,
-                                                     PUBSUB_ADMIN_SERVICE_NAME, entry,
-                                                     pstm_teardownTopicReceiverCallback);
-                celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-                for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-                    pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
-                            manager->announceEndpointListeners.list, i);
-                    listener->revokeEndpoint(listener->handle, entry->endpoint);
-                }
-                celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                celix_arrayList_add(revokeEndpoints, celix_properties_copy(entry->endpoint));
+                struct pstm_teardown_entry* teardownEntry = malloc(sizeof(*teardownEntry));
+                teardownEntry->scope = celix_utils_strdup(entry->scope);
+                teardownEntry->topic = celix_utils_strdup(entry->topic);
+                teardownEntry->psaSvcId = entry->matching.selectedPsaSvcId;
+                celix_arrayList_add(teardownEntries, teardownEntry);
             }
 
             if (entry->usageCount <= 0) {
@@ -758,13 +792,37 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
                     celix_properties_destroy(entry->endpoint);
                 }
                 entry->endpoint = NULL;
-                entry->selectedPsaSvcId = -1L;
-                entry->selectedSerializerSvcId = -1L;
-                entry->selectedProtocolSvcId = -1L;
+                entry->matching.selectedPsaSvcId = -1L;
+                entry->matching.selectedSerializerSvcId = -1L;
+                entry->matching.selectedProtocolSvcId = -1L;
             }
         }
     }
     celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+    for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+        pubsub_announce_endpoint_listener_t *listener;
+        listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+        for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+            celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+            listener->revokeEndpoint(listener->handle, endpoint);
+            celix_properties_destroy(endpoint);
+        }
+    }
+    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+    celix_arrayList_destroy(revokeEndpoints);
+
+    for (int i = 0; i < celix_arrayList_size(teardownEntries); ++i) {
+        struct pstm_teardown_entry* entry = celix_arrayList_get(teardownEntries, i);
+        celix_bundleContext_useServiceWithId(manager->context, entry->psaSvcId,
+                                             PUBSUB_ADMIN_SERVICE_NAME,
+                                             entry, pstm_teardownTopicReceiverCallback);
+        free(entry->scope);
+        free(entry->topic);
+        free(entry);
+    }
+    celix_arrayList_destroy(teardownEntries);
 }
 
 static void pstm_addEndpointCallback(void *handle, void *svc) {
@@ -788,6 +846,7 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
                 pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry);
                 long svcId = (long) hashMapEntry_getKey(mapEntry);
                 bool match = false;
+                //NOTE assuming match is safe to call within a lock
                 psa->matchDiscoveredEndpoint(psa->handle, entry->endpoint, &match);
                 if (match) {
                     psaSvcId = svcId;
@@ -797,6 +856,7 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
             celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 
             if (psaSvcId >= 0) {
+                //NOTE assuming adding discovered endpoint is safe to call within a lock
                 celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
                                                      (void *) entry->endpoint, pstm_addEndpointCallback);
             } else {
@@ -809,18 +869,31 @@ static void pstm_findPsaForEndpoints(pubsub_topology_manager_t *manager) {
     celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
 }
 
+struct pstm_setup_entry {
+    char* scope;
+    char* topic;
+    char* key;
+    celix_properties_t* topicProperties;
+    long selectedSerializerSvcId;
+    long selectedProtocolSvcId;
+    long psaSvcId;
+    celix_properties_t* endpointResult;
+};
+
 static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
-    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    struct pstm_setup_entry *entry = handle;
     pubsub_admin_service_t *psa = svc;
-    psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
+    psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpointResult);
 }
 
 static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
+    celix_array_list_t* setupEntries = celix_arrayList_create();
+
     celixThreadMutex_lock(&manager->topicSenders.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL && entry->needsMatch && entry->usageCount > 0) {
+        if (entry != NULL && entry->matching.needsMatch && entry->usageCount > 0) {
             //new topic sender needed, requesting match with current psa
             double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
             long serializerSvcId = -1L;
@@ -838,7 +911,9 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
                 long serSvcId = -1L;
                 long protSvcId = -1L;
                 celix_properties_t *topicProps = NULL;
-                psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId, &protSvcId);
+                //NOTE assuming matchPublisher is safe to call within lock
+                psa->matchPublisher(psa->handle, entry->bndId, entry->publisherFilter, &topicProps, &score, &serSvcId,
+                                    &protSvcId);
                 if (score > highestScore) {
                     if (topicPropertiesForHighestMatch != NULL) {
                         celix_properties_destroy(topicPropertiesForHighestMatch);
@@ -855,45 +930,73 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
             celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 
             if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
-                entry->selectedPsaSvcId = selectedPsaSvcId;
-                entry->selectedSerializerSvcId = serializerSvcId;
-                entry->selectedProtocolSvcId = protocolSvcId;
-                entry->topicProperties = topicPropertiesForHighestMatch;
-                bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
-
-                if (called && entry->endpoint != NULL) {
-                    entry->needsMatch = false;
-
-                    //announce new endpoint through the network
-                    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-                    for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-                        pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
-                        listener->announceEndpoint(listener->handle, entry->endpoint);
-                    }
-                    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-                }
+                entry->matching.needsMatch = false;
+                //NOTE the needsMatch can be updated as soon as the lock manager->topicSenders.mutex is released.
+                //This is ok, because the next pstm update loop will then do a new teardown/setup
+
+                struct pstm_setup_entry *setupEntry = malloc(sizeof(*setupEntry));
+                setupEntry->endpointResult = NULL;
+                setupEntry->scope = celix_utils_strdup(entry->scope);
+                setupEntry->topic = celix_utils_strdup(entry->topic);
+                setupEntry->key = pubsubEndpoint_createScopeTopicKey(entry->scope, entry->topic);
+                setupEntry->topicProperties = topicPropertiesForHighestMatch;
+                setupEntry->psaSvcId = selectedPsaSvcId;
+                setupEntry->selectedSerializerSvcId = serializerSvcId;
+                setupEntry->selectedProtocolSvcId = protocolSvcId;
+                celix_arrayList_add(setupEntries, setupEntry);
+            }
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+    for (int i = 0; i < celix_arrayList_size(setupEntries); ++i) {
+        struct pstm_setup_entry* setupEntry = celix_arrayList_get(setupEntries, i);
+        bool called = celix_bundleContext_useServiceWithId(manager->context, setupEntry->psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, setupEntry, pstm_setupTopicSenderCallback);
+        if (called && setupEntry->endpointResult != NULL) {
+            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+            for (int k = 0; k < celix_arrayList_size(manager->announceEndpointListeners.list); ++k) {
+                pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                listener->announceEndpoint(listener->handle, setupEntry->endpointResult);
             }
+            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
 
-            if (entry->needsMatch) {
-                celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
+            celixThreadMutex_lock(&manager->topicSenders.mutex);
+            pstm_topic_receiver_or_sender_entry_t* entry = hashMap_get(manager->topicSenders.map, setupEntry->key);
+            if (entry->endpoint != NULL) {
+                celix_properties_destroy(entry->endpoint);
             }
+            entry->endpoint = setupEntry->endpointResult;
+            entry->topicProperties = setupEntry->topicProperties;
+            entry->matching.selectedPsaSvcId = setupEntry->psaSvcId;
+            entry->matching.selectedSerializerSvcId = setupEntry->selectedSerializerSvcId;
+            entry->matching.selectedProtocolSvcId = setupEntry->selectedProtocolSvcId;
+            celixThreadMutex_unlock(&manager->topicSenders.mutex);
+        } else {
+            celix_logHelper_warning(manager->loghelper, "Cannot setup TopicSender for %s/%s\n", setupEntry->scope == NULL ? "(null)" : setupEntry->scope, setupEntry->topic);
         }
+        free(setupEntry->scope);
+        free(setupEntry->topic);
+        free(setupEntry->key);
+        free(setupEntry);
     }
-    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+    celix_arrayList_destroy(setupEntries);
 }
 
 static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
-    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    struct pstm_setup_entry *entry = handle;
     pubsub_admin_service_t *psa = svc;
-    psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpoint);
+    psa->setupTopicReceiver(psa->handle, entry->scope, entry->topic, entry->topicProperties, entry->selectedSerializerSvcId, entry->selectedProtocolSvcId, &entry->endpointResult);
 }
 
 static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
+    celix_array_list_t* setupEntries = celix_arrayList_create();
+
     celixThreadMutex_lock(&manager->topicReceivers.mutex);
     hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
         pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter);
-        if (entry != NULL && entry->needsMatch && entry->usageCount > 0) {
+        if (entry != NULL && entry->matching.needsMatch && entry->usageCount > 0) {
 
             double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
             long serializerSvcId = -1L;
@@ -912,7 +1015,8 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
                 long protSvcId = -1L;
                 celix_properties_t *topicProps = NULL;
 
-                psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score, &serSvcId, &protSvcId);
+                psa->matchSubscriber(psa->handle, entry->bndId, entry->subscriberProperties, &topicProps, &score,
+                                     &serSvcId, &protSvcId);
                 if (score > highestScore) {
                     if (highestMatchTopicProperties != NULL) {
                         celix_properties_destroy(highestMatchTopicProperties);
@@ -929,35 +1033,59 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
             celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 
             if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
-                entry->selectedPsaSvcId = selectedPsaSvcId;
-                entry->selectedSerializerSvcId = serializerSvcId;
-                entry->selectedProtocolSvcId = protocolSvcId;
-                entry->topicProperties = highestMatchTopicProperties;
-
-                bool called = celix_bundleContext_useServiceWithId(manager->context, selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
-                                                                   entry,
-                                                                   pstm_setupTopicReceiverCallback);
-
-                if (called && entry->endpoint != NULL) {
-                    entry->needsMatch = false;
-                    //announce new endpoint through the network
-                    celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
-                    for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
-                        pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(
-                                manager->announceEndpointListeners.list, i);
-                        listener->announceEndpoint(listener->handle, entry->endpoint);
-                    }
-                    celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
-                }
+                entry->matching.needsMatch = false;
+                //NOTE the needsMatch can be updated as soon as the lock manager->topicSenders.mutex is released.
+                //This is ok, because the next pstm update loop will then do a new teardown/setup
+
+                struct pstm_setup_entry *setupEntry = malloc(sizeof(*setupEntry));
+                setupEntry->endpointResult = NULL;
+                setupEntry->scope = celix_utils_strdup(entry->scope);
+                setupEntry->topic = celix_utils_strdup(entry->topic);
+                setupEntry->key = pubsubEndpoint_createScopeTopicKey(entry->scope, entry->topic);
+                setupEntry->topicProperties = highestMatchTopicProperties;
+                setupEntry->psaSvcId = selectedPsaSvcId;
+                setupEntry->selectedSerializerSvcId = serializerSvcId;
+                setupEntry->selectedProtocolSvcId = protocolSvcId;
+                celix_arrayList_add(setupEntries, setupEntry);
             }
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
 
 
-            if (entry->needsMatch) {
-                celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
+    for (int i = 0; i < celix_arrayList_size(setupEntries); ++i) {
+        struct pstm_setup_entry* setupEntry = celix_arrayList_get(setupEntries, i);
+        bool called = celix_bundleContext_useServiceWithId(manager->context, setupEntry->psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, setupEntry, pstm_setupTopicReceiverCallback);
+        if (called && setupEntry->endpointResult != NULL) {
+            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+            for (int k = 0; k < celix_arrayList_size(manager->announceEndpointListeners.list); ++k) {
+                pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                listener->announceEndpoint(listener->handle, setupEntry->endpointResult);
             }
+            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+            celixThreadMutex_lock(&manager->topicReceivers.mutex);
+            pstm_topic_receiver_or_sender_entry_t* entry = hashMap_get(manager->topicReceivers.map, setupEntry->key);
+            if (entry->endpoint != NULL) {
+                celix_properties_destroy(entry->endpoint);
+            }
+            entry->endpoint = setupEntry->endpointResult;
+            entry->topicProperties = setupEntry->topicProperties;
+            entry->matching.selectedPsaSvcId = setupEntry->psaSvcId;
+            entry->matching.selectedSerializerSvcId = setupEntry->selectedSerializerSvcId;
+            entry->matching.selectedProtocolSvcId = setupEntry->selectedProtocolSvcId;
+            celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+        } else {
+            celix_logHelper_warning(manager->loghelper, "Cannot setup TopicReceiver for %s/%s\n", setupEntry->scope == NULL ? "(null)" : setupEntry->scope, setupEntry->topic);
         }
+        free(setupEntry->scope);
+        free(setupEntry->topic);
+        free(setupEntry->key);
+        free(setupEntry);
     }
-    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+    celix_arrayList_destroy(setupEntries);
 }
 
 static void *pstm_psaHandlingThread(void *data) {
@@ -1058,9 +1186,9 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         fprintf(os, "   |- serializer  = %s\n", serType);
         fprintf(os, "   |- protocol    = %s\n", protType);
         if (manager->verbose) {
-            fprintf(os, "   |- psa svc id  = %li\n", entry->selectedPsaSvcId);
-            fprintf(os, "   |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
-            fprintf(os, "   |- prot svc id = %li\n", entry->selectedProtocolSvcId);
+            fprintf(os, "   |- psa svc id  = %li\n", entry->matching.selectedPsaSvcId);
+            fprintf(os, "   |- ser svc id  = %li\n", entry->matching.selectedSerializerSvcId);
+            fprintf(os, "   |- prot svc id = %li\n", entry->matching.selectedProtocolSvcId);
             fprintf(os, "   |- usage count = %i\n", entry->usageCount);
         }
     }
@@ -1088,9 +1216,9 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         fprintf(os, "   |- serializer  = %s\n", serType);
         fprintf(os, "   |- protocol    = %s\n", protType);
         if (manager->verbose) {
-            fprintf(os, "    |- psa svc id  = %li\n", entry->selectedPsaSvcId);
-            fprintf(os, "    |- ser svc id  = %li\n", entry->selectedSerializerSvcId);
-            fprintf(os, "    |- prot svc id = %li\n", entry->selectedProtocolSvcId);
+            fprintf(os, "    |- psa svc id  = %li\n", entry->matching.selectedPsaSvcId);
+            fprintf(os, "    |- ser svc id  = %li\n", entry->matching.selectedSerializerSvcId);
+            fprintf(os, "    |- prot svc id = %li\n", entry->matching.selectedProtocolSvcId);
             fprintf(os, "    |- usage count = %i\n", entry->usageCount);
         }
     }
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 853811b..9f5ef49 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -87,16 +87,11 @@ typedef struct pstm_discovered_endpoint_entry {
 } pstm_discovered_endpoint_entry_t;
 
 typedef struct pstm_topic_receiver_or_sender_entry {
-    bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
-
     char *scopeAndTopicKey; //key of the combined value of the scope and topic
     celix_properties_t *endpoint;
     char *topic;
     char *scope;
-    int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic)
-    long selectedPsaSvcId;
-    long selectedSerializerSvcId;
-    long selectedProtocolSvcId;
+    int usageCount; //nr of provided subscriber services for the topic receiver (matching scope & topic) or nr of publisher requested for matching scope & topic.
     long bndId;
     celix_properties_t *topicProperties; //found in META-INF/(pub|sub)/(topic).properties
 
@@ -105,6 +100,13 @@ typedef struct pstm_topic_receiver_or_sender_entry {
 
     //for receiver entry
     celix_properties_t *subscriberProperties;
+
+    struct {
+        bool needsMatch; //true if a psa needs to be selected or if a new psa has to be considered.
+        long selectedPsaSvcId;
+        long selectedSerializerSvcId;
+        long selectedProtocolSvcId;
+    } matching;
 } pstm_topic_receiver_or_sender_entry_t;
 
 celix_status_t pubsub_topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, pubsub_topology_manager_t **manager);


Mime
View raw message