celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From erjanalt...@apache.org
Subject [1/2] celix git commit: admin mutexes replaced by std::mutex and lock_guard
Date Thu, 01 Nov 2018 20:40:43 GMT
Repository: celix
Updated Branches:
  refs/heads/nanomsg cb740b0d4 -> 3009e6470


admin mutexes replaced by std::mutex and lock_guard


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95892a85
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95892a85
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95892a85

Branch: refs/heads/nanomsg
Commit: 95892a8577107eb4fa93410ae08913d51407763b
Parents: cb740b0
Author: Erjan Altena <erjanaltena@gmail.com>
Authored: Thu Nov 1 21:17:02 2018 +0100
Committer: Erjan Altena <erjanaltena@gmail.com>
Committed: Thu Nov 1 21:17:02 2018 +0100

----------------------------------------------------------------------
 .../src/psa_nanomsg_activator.cc                |   4 -
 .../src/pubsub_nanomsg_admin.cc                 | 329 +++++++++----------
 .../src/pubsub_nanomsg_admin.h                  |  10 +-
 3 files changed, 166 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
index ec3ee7d..e599f01 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc
@@ -87,10 +87,6 @@ private:
     LogHelper logHelper;
 	pubsub_nanomsg_admin admin;
 
-
-//    command_service_t cmdSvc{};
-
-//	long cmdSvcId = -1L;
 };
 
 celix_status_t  celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData)
{

http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index bd1d0a5..c10431f 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -105,64 +105,59 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx,
log_hel
     qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY,
PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
     qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY,
PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadMutex_create(&serializers.mutex, nullptr);
     serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
 
-    celixThreadMutex_create(&topicSenders.mutex, nullptr);
     topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
-    celixThreadMutex_create(&topicReceivers.mutex, nullptr);
     topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr);
 
-    celixThreadMutex_create(&discoveredEndpoints.mutex, nullptr);
     discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals,
nullptr);
 }
 
 pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     //note assuming al psa register services and service tracker are removed.
-
-    celixThreadMutex_lock(&topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
-        pubsub_nanoMsgTopicSender_destroy(sender);
+    {
+        std::lock_guard<std::mutex> lock(topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter));
+            pubsub_nanoMsgTopicSender_destroy(sender);
+        }
     }
-    celixThreadMutex_unlock(&topicSenders.mutex);
 
-    celixThreadMutex_lock(&topicReceivers.mutex);
-    iter = hashMapIterator_construct(topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-        pubsub_nanoMsgTopicReceiver_destroy(recv);
+    {
+        std::lock_guard<std::mutex> lock(topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+            pubsub_nanoMsgTopicReceiver_destroy(recv);
+        }
     }
-    celixThreadMutex_unlock(&topicReceivers.mutex);
 
-    celixThreadMutex_lock(&discoveredEndpoints.mutex);
-    iter = hashMapIterator_construct(discoveredEndpoints.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
-        celix_properties_destroy(ep);
+    {
+        std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+            celix_properties_destroy(ep);
+        }
     }
-    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
-    celixThreadMutex_lock(&serializers.mutex);
-    iter = hashMapIterator_construct(serializers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
-        free(entry);
+    {
+        std::lock_guard<std::mutex> lock(serializers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(serializers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMapIterator_nextValue(&iter));
+            free(entry);
+        }
     }
-    celixThreadMutex_unlock(&serializers.mutex);
 
-    celixThreadMutex_destroy(&topicSenders.mutex);
     hashMap_destroy(topicSenders.map, true, false);
 
-    celixThreadMutex_destroy(&topicReceivers.mutex);
     hashMap_destroy(topicReceivers.map, true, false);
 
-    celixThreadMutex_destroy(&discoveredEndpoints.mutex);
     hashMap_destroy(discoveredEndpoints.map, false, false);
 
-    celixThreadMutex_destroy(&serializers.mutex);
     hashMap_destroy(serializers.map, false, false);
 
     free(ipAddress);
@@ -259,16 +254,17 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t
         return;
     }
 
-    celixThreadMutex_lock(&serializers.mutex);
-    auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map,
(void*)svcId));
-    if (entry == nullptr) {
-        entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
-        entry->serType = serType;
-        entry->svcId = svcId;
-        entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-        hashMap_put(serializers.map, (void*)svcId, entry);
+    {
+        std::lock_guard<std::mutex> lock(serializers.mutex);
+        auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map,
(void*)svcId));
+        if (entry == nullptr) {
+            entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(*entry)));
+            entry->serType = serType;
+            entry->svcId = svcId;
+            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
+            hashMap_put(serializers.map, (void*)svcId, entry);
+        }
     }
-    celixThreadMutex_unlock(&serializers.mutex);
 }
 
 
@@ -281,40 +277,41 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const
celix_proper
     // 3) loop and destroy all topic receivers using the serializer
     // Note that it is the responsibility of the topology manager to create new topic senders/receivers
 
-    celixThreadMutex_lock(&serializers.mutex);
+    std::lock_guard<std::mutex> lock(serializers.mutex);
     auto *entry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_remove(serializers.map,
(void*)svcId));
     if (entry != nullptr) {
-        celixThreadMutex_lock(&topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapEntry_getValue(senderEntry));
-            if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender))
{
-                char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
-                hashMapIterator_remove(&iter);
-                pubsub_nanoMsgTopicSender_destroy(sender);
-                free(key);
+        {
+            std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
+            hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+                auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry));
+                if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender))
{
+                    char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry));
+                    hashMapIterator_remove(&iter);
+                    pubsub_nanoMsgTopicSender_destroy(sender);
+                    free(key);
+                }
             }
         }
-        celixThreadMutex_unlock(&topicSenders.mutex);
 
-        celixThreadMutex_lock(&topicReceivers.mutex);
-        iter = hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
-            auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
-            if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver))
{
-                char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
-                hashMapIterator_remove(&iter);
-                pubsub_nanoMsgTopicReceiver_destroy(receiver);
-                free(key);
+        {
+            std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
+            hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
+                auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+                if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver))
{
+                    char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry));
+                    hashMapIterator_remove(&iter);
+                    pubsub_nanoMsgTopicReceiver_destroy(receiver);
+                    free(key);
+                }
             }
         }
-        celixThreadMutex_unlock(&topicReceivers.mutex);
 
         free(entry);
     }
-    celixThreadMutex_unlock(&serializers.mutex);
 }
 
 celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t
*svcFilter,
@@ -353,7 +350,7 @@ celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t
*end
 
 celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic,
                                                     long serializerSvcId, celix_properties_t
**outPublisherEndpoint) {
-    celix_status_t  status = CELIX_SUCCESS;
+    celix_status_t status = CELIX_SUCCESS;
 
     //1) Create TopicSender
     //2) Store TopicSender
@@ -363,21 +360,22 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope,
const c
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicSenders.mutex);
-    auto *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_get(topicSenders.map,
key));
+    pubsub_nanomsg_topic_sender_t *sender = nullptr;
+    std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
+    sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map,
key));
     if (sender == nullptr) {
-        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map,
(void*)serializerSvcId));
+        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+                                                                                   (void
*) serializerSvcId));
         if (serEntry != nullptr) {
-            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId,
serEntry->svc,
-                                                      ipAddress, basePort, maxPort);
+            sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId,
serEntry->svc, ipAddress,
+                                                      basePort, maxPort);
         }
         if (sender != nullptr) {
             const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
             const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE,
psaType,
-                                                serType, nullptr);
+            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE,
psaType, serType,
+                                                nullptr);
             celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender));
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME",
nullptr);
@@ -393,9 +391,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope,
const c
         free(key);
         L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic
%s/%s!", scope, topic);
     }
-    celixThreadMutex_unlock(&topicSenders.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
-
     if (sender != nullptr && newEndpoint != nullptr) {
         //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender?
     }
@@ -414,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope,
cons
     //2) destroy topic sender
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&topicSenders.mutex);
+    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
     if (entry != nullptr) {
         char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
@@ -425,7 +420,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope,
cons
     } else {
         L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not
exists", scope, topic);
     }
-    celixThreadMutex_unlock(&topicSenders.mutex);
     free(key);
 
     return status;
@@ -437,41 +431,41 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope,
const
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicReceivers.mutex);
-    auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMap_get(topicReceivers.map,
key));
-    if (receiver == nullptr) {
-        auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map,
(void*)serializerSvcId));
-        if (serEntry != nullptr) {
-            receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId,
-                                                          serEntry->svc);
-        } else {
-            L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope,
topic);
-        }
-        if (receiver != nullptr) {
-            const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-            const char *serType = serEntry->serType;
-            newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic,
-                                                PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
serType, nullptr);
-            //if available also set container name
-            const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME",
nullptr);
-            if (cn != nullptr) {
-                celix_properties_set(newEndpoint, "container_name", cn);
+    pubsub_nanomsg_topic_receiver_t * receiver = nullptr;
+    {
+        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
+        receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map,
key));
+        if (receiver == nullptr) {
+            auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map,
+                                                                                       (void
*) serializerSvcId));
+            if (serEntry != nullptr) {
+                receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId,
serEntry->svc);
+            } else {
+                L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope,
topic);
+            }
+            if (receiver != nullptr) {
+                const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
+                const char *serType = serEntry->serType;
+                newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE,
psaType,
+                                                    serType, nullptr);
+                //if available also set container name
+                const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME",
nullptr);
+                if (cn != nullptr) {
+                    celix_properties_set(newEndpoint, "container_name", cn);
+                }
+                hashMap_put(topicReceivers.map, key, receiver);
+            } else {
+                L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
+                free(key);
             }
-            hashMap_put(topicReceivers.map, key, receiver);
         } else {
-            L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
             free(key);
+            L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic
%s/%s!", scope, topic);
         }
-    } else {
-        free(key);
-        L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic
%s/%s!", scope, topic);
     }
-    celixThreadMutex_unlock(&topicReceivers.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
-
     if (receiver != nullptr && newEndpoint != nullptr) {
-        celixThreadMutex_lock(&discoveredEndpoints.mutex);
+        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map);
         while (hashMapIterator_hasNext(&iter)) {
             auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
@@ -480,7 +474,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope,
const
                 connectEndpointToReceiver(receiver, endpoint);
             }
         }
-        celixThreadMutex_unlock(&discoveredEndpoints.mutex);
     }
 
     if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
@@ -493,7 +486,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope,
const
 
 celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char
*topic) {
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    celixThreadMutex_lock(&topicReceivers.mutex);
+    std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
     hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
     free(key);
     if (entry != nullptr) {
@@ -504,7 +497,6 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char
*scope, co
         free(receiverKey);
         pubsub_nanoMsgTopicReceiver_destroy(receiver);
     }
-    celixThreadMutex_lock(&topicReceivers.mutex);
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
@@ -542,20 +534,18 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t
*endpo
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE))
== 0) {
-        celixThreadMutex_lock(&topicReceivers.mutex);
+        std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
             connectEndpointToReceiver(receiver, endpoint);
         }
-        celixThreadMutex_unlock(&topicReceivers.mutex);
     }
 
-    celixThreadMutex_lock(&discoveredEndpoints.mutex);
+    std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
     celix_properties_t *cpy = celix_properties_copy(endpoint);
     const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr);
     hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
-    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
@@ -592,20 +582,19 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t
*en
     const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr);
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE))
== 0) {
-        celixThreadMutex_lock(&topicReceivers.mutex);
+        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
         hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
         while (hashMapIterator_hasNext(&iter)) {
             pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
-        celixThreadMutex_unlock(&topicReceivers.mutex);
     }
-
-    celixThreadMutex_lock(&discoveredEndpoints.mutex);
-    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
-    celix_properties_t *found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map,
(void*)uuid));
-    celixThreadMutex_unlock(&discoveredEndpoints.mutex);
-
+    celix_properties_t *found = nullptr;
+    {
+        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
+        const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr);
+        found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map,
(void*)uuid));
+    }
     if (found != nullptr) {
         celix_properties_destroy(found);
     }
@@ -620,52 +609,56 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine
__attribut
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicSenders.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMapIterator_nextValue(&iter));
-        long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map,
(void*)serSvcId));
-        const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
-        const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
-        const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
-        const char *url = pubsub_nanoMsgTopicSender_url(sender);
-        fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
-        fprintf(out, "   |- serializer type = %s\n", serType);
-        fprintf(out, "   |- url             = %s\n", url);
-    }
-    celixThreadMutex_unlock(&topicSenders.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
-
-    fprintf(out, "\n");
-    fprintf(out, "\nTopic Receivers:\n");
-    celixThreadMutex_lock(&serializers.mutex);
-    celixThreadMutex_lock(&topicReceivers.mutex);
-    iter = hashMapIterator_construct(topicReceivers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-        long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
-        psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t*>(hashMap_get(serializers.map,
(void*)serSvcId));
-        const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
-        const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
-        const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
-
-        std::vector<std::string> connected{};
-        std::vector<std::string> unconnected{};
-        pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
-
-        fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
-        fprintf(out, "   |- serializer type = %s\n", serType);
-        for (auto url : connected) {
-            fprintf(out, "   |- connected url   = %s\n", url.c_str());
+    {
+        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
+        std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t
*>(hashMapIterator_nextValue(
+                    &iter));
+            long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
+            psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t
*>(hashMap_get(
+                    serializers.map, (void *) serSvcId));
+            const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+            const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
+            const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
+            const char *url = pubsub_nanoMsgTopicSender_url(sender);
+            fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+            fprintf(out, "   |- serializer type = %s\n", serType);
+            fprintf(out, "   |- url             = %s\n", url);
         }
-        for (auto url : unconnected) {
-            fprintf(out, "   |- unconnected url = %s\n", url.c_str());
+    }
+
+    {
+        fprintf(out, "\n");
+        fprintf(out, "\nTopic Receivers:\n");
+        std::lock_guard<std::mutex> serialerLock(serializers.mutex);
+        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
+        hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t
*>(hashMapIterator_nextValue(
+                    &iter));
+            long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
+            psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t
*>(hashMap_get(
+                    serializers.map, (void *) serSvcId));
+            const char *serType = serEntry == nullptr ? "!Error!" : serEntry->serType;
+            const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver);
+            const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver);
+
+            std::vector<std::string> connected{};
+            std::vector<std::string> unconnected{};
+            pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected);
+
+            fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
+            fprintf(out, "   |- serializer type = %s\n", serType);
+            for (auto url : connected) {
+                fprintf(out, "   |- connected url   = %s\n", url.c_str());
+            }
+            for (auto url : unconnected) {
+                fprintf(out, "   |- unconnected url = %s\n", url.c_str());
+            }
         }
     }
-    celixThreadMutex_unlock(&topicReceivers.mutex);
-    celixThreadMutex_unlock(&serializers.mutex);
     fprintf(out, "\n");
 
     return status;

http://git-wip-us.apache.org/repos/asf/celix/blob/95892a85/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 385b400..b06c887 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -20,6 +20,7 @@
 #ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
 #define CELIX_PUBSUB_ZMQ_ADMIN_H
 
+#include <mutex>
 #include <pubsub_admin.h>
 #include "celix_api.h"
 #include "log_helper.h"
@@ -98,22 +99,22 @@ private:
     bool verbose{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t*
     } serializers{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
     } topicSenders{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t*
     } topicReceivers{};
 
     struct {
-        celix_thread_mutex_t mutex;
+        std::mutex mutex;
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
     } discoveredEndpoints{};
 
@@ -127,7 +128,6 @@ extern "C" {
 }
 #endif
 
-celix_status_t pubsub_nanoMsgAdmin_executeCommand(void *handle, char *commandLine, FILE *outStream,
FILE *errStream);
 
 #endif //CELIX_PUBSUB_ZMQ_ADMIN_H
 


Mime
View raw message