celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [celix] 02/02: Removes adding of default scope in pubsub discovery.
Date Mon, 29 Jun 2020 14:27:00 GMT
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections
in repository https://gitbox.apache.org/repos/asf/celix.git

commit e10d9835c8fba6c4086406d36841431e9810fdd6
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
AuthorDate: Mon Jun 29 16:26:30 2020 +0200

    Removes adding of default scope in pubsub discovery.
    
    If no scope is set for a publisher/subscriber this needs to be reflected in the
    endpoints so that correct matching can occur.
---
 .../examples/pubsub/publisher/CMakeLists.txt       |  1 +
 .../examples/pubsub/subscriber/CMakeLists.txt      |  2 +-
 .../src/pubsub_tcp_topic_receiver.c                |  9 ++++---
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  6 ++++-
 .../src/pubsub_zmq_topic_receiver.c                |  6 ++++-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |  6 ++++-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   | 29 ++++++++++++----------
 .../pubsub/pubsub_spi/include/pubsub_constants.h   |  5 ----
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    |  2 --
 .../src/pubsub_topology_manager.c                  | 18 +++++++-------
 10 files changed, 47 insertions(+), 37 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
index c68d533..3b9ea50 100644
--- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@ -25,6 +25,7 @@ add_celix_bundle(celix_pubsub_poi_publisher
 
 target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api)
 target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include)
+target_compile_definitions(celix_pubsub_poi_publisher PRIVATE USE_SCOPE)
 
 celix_bundle_files(celix_pubsub_poi_publisher
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
index 94acdb2..8bee482 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@ -25,7 +25,7 @@ add_celix_bundle(celix_pubsub_poi_subscriber
 
 target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api)
 target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include)
-
+target_compile_definitions(celix_pubsub_poi_subscriber PRIVATE USE_SCOPE)
 
 celix_bundle_files(celix_pubsub_poi_subscriber
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index eb6afbd..8ecb7df 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -641,10 +641,11 @@ static void *psa_tcp_recvThread(void *data) {
 
 pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t
*receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope,
-             PUBSUB_AMDIN_METRICS_NAME_MAX,
-             "%s",
-             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    if (receiver->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 47dc888..809ab7f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -449,7 +449,11 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t
*re
 
 pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender)
{
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL
? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
+    if (sender->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
index 088474a..2ae58af 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c
@@ -711,7 +711,11 @@ static void* psa_zmq_recvThread(void * data) {
 
 pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t
*receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope ==
NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    if (receiver->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 413f1b3..85c488b 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -464,7 +464,11 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t
*re
 
 pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender)
{
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL
? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
+    if (sender->scope != NULL) {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
+    } else {
+        snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "");
+    }
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index c6d1aa9..5e1b702 100644
--- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@ -398,7 +398,11 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const
celix_prope
         clock_gettime(CLOCK_MONOTONIC, &entry->createTime);
         entry->isSet = false;
         entry->properties = celix_properties_copy(endpoint);
-        asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE
: scope, topic, uuid);
+        if (scope == NULL) {
+            asprintf(&entry->key, "/pubsub/%s/%s/%s", config, topic, uuid);
+        } else {
+            asprintf(&entry->key, "/pubsub/%s/%s__%s/%s", config, scope, topic, uuid);
+        }
 
         const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID,
NULL);
         celixThreadMutex_lock(&disc->announcedEndpointsMutex);
@@ -409,7 +413,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope
         celixThreadCondition_broadcast(&disc->waitCond);
         celixThreadMutex_unlock(&disc->runningMutex);
     } else if (valid) {
-        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured
visibility is %s\n", scope == NULL ? "(null)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY,
visibility);
+        L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured
visibility is %s\n", scope == NULL ? "(empty)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY,
visibility);
     }
 
     if (!valid) {
@@ -464,8 +468,8 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t
*disc, cel
         if (disc->verbose) {
             const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
             const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE,
"!Error!");
-            const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY,
"!Error!");
-            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+            const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY,
"(no serialization)");
+            const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no
protocol)");
             L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer
is %s, protocol is %s.\n",
                    uuid, type, admin, ser, prot);
         }
@@ -495,8 +499,8 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t
*disc,
     if (disc->verbose) {
         const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!");
         const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!");
-        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!");
+        const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no
serialization)");
+        const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no
protocol)");
         L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer
is %s, protocol = %s.\n",
                uuid, type, admin, ser, prot);
     }
@@ -572,7 +576,6 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine
__at
 
     struct timespec now;
     clock_gettime(CLOCK_MONOTONIC, &now);
-    //TODO add support for query (scope / topic)
 
     fprintf(os, "\n");
     fprintf(os, "Discovery configuration:\n");
@@ -589,11 +592,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine
__at
     while (hashMapIterator_hasNext(&iter)) {
         celix_properties_t *ep = hashMapIterator_nextValue(&iter);
         const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!");
-        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)");
         const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
-        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
-        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
+        const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)");
+        const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)");
         const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!");
         fprintf(os, "Endpoint %s:\n", uuid);
         fprintf(os, "   |- type          = %s\n", type);
@@ -612,11 +615,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine
__at
     while (hashMapIterator_hasNext(&iter)) {
         pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter);
         const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID,
"!Error!");
-        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE,
"!Error!");
+        const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE,
"(no scope)");
         const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME,
"!Error!");
         const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE,
"!Error!");
-        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER,
"!Error!");
-        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL,
"!Error!");
+        const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER,
"(no serialization)");
+        const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL,
"(no protocol)");
         const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE,
"!Error!");
         int age = (int)(now.tv_sec - entry->createTime.tv_sec);
         fprintf(os, "Endpoint %s:\n", uuid);
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
index 00ee6b4..671b874 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h
@@ -39,9 +39,4 @@
  */
 #define PUBSUB_ENDPOINT_LOCAL_VISIBILITY     "local"
 
-/**
- * Default scope, if not scope is specified endpoints are published using this scope
- */
-#define PUBSUB_DEFAULT_ENDPOINT_SCOPE        "default"
-
 #endif /* PUBSUB_CONSTANTS_H_ */
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index 2aa052d..540ecda 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -65,8 +65,6 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char*
fwUUID,
 
     if (scope != NULL) {
         celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
-    } else {
-        celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE);
     }
 
     if (topic != NULL) {
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 2b063ef..434de1e 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -1009,11 +1009,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *cn = celix_properties_get(discovered->endpoint, "container_name",
"!Error!");
         const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID,
"!Error!");
         const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE,
"!Error!");
-        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE,
"(null)");
+        const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE,
"(no scope)");
         const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME,
"!Error!");
         const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE,
"!Error!");
-        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER,
"!Error!");
-        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL,
"!Error!");
+        const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER,
"(no serialization)");
+        const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL,
"(no protocol)");
         fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
         fprintf(os, "   |- container name = %s\n", cn);
         fprintf(os, "   |- fw uuid        = %s\n", fwuuid);
@@ -1044,10 +1044,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         }
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID,
"!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE,
"!Error!");
-        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER,
"!Error!");
-        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL,
"!Error!");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER,
"(no serialization)");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL,
"(no protocol)");
         fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(no scope)" :
entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
@@ -1074,10 +1074,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t
         }
         const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID,
"!Error!");
         const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE,
"!Error!");
-        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER,
"!Error!");
-        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL,
"!Error!");
+        const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER,
"(no serialization)");
+        const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL,
"(no protocol)");
         fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(null)" : entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? "(no scope)" :
entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);


Mime
View raw message