celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [celix] branch develop updated: Added option for custom msgId (#38)
Date Sun, 01 Sep 2019 14:05:12 GMT
This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new 87c9681  Added option for custom msgId (#38)
87c9681 is described below

commit 87c9681c0dcafbe4b50fea5b1b3f7ef65c06cae6
Author: Roy Lenferink <lenferinkroy@gmail.com>
AuthorDate: Sun Sep 1 16:05:05 2019 +0200

    Added option for custom msgId (#38)
---
 .../pubsub/msg_descriptors/msg_poi2.descriptor     |  1 +
 .../private/include/pubsub_publisher_private.h     | 20 ++---
 .../publisher/private/src/ps_pub_activator.c       |  6 +-
 .../publisher/private/src/pubsub_publisher.c       | 91 ++++++++++++----------
 .../private/include/pubsub_subscriber_private.h    | 10 +--
 .../subscriber/private/src/ps_sub_activator.c      | 12 +--
 .../subscriber/private/src/pubsub_subscriber.c     | 12 ++-
 .../pubsub_admin_tcp/src/pubsub_tcp_common.c       | 10 +--
 .../pubsub_admin_tcp/src/pubsub_tcp_common.h       |  1 -
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 26 ++++---
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_common.c  |  5 --
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_common.h  |  2 -
 .../src/pubsub_udpmc_topic_receiver.c              |  4 +-
 .../src/pubsub_udpmc_topic_sender.c                | 16 ++++
 .../src/pubsub_websocket_common.c                  |  5 --
 .../src/pubsub_websocket_common.h                  |  1 -
 .../src/pubsub_websocket_topic_sender.c            | 11 ++-
 .../pubsub_admin_zmq/src/pubsub_zmq_common.c       |  5 --
 .../pubsub_admin_zmq/src/pubsub_zmq_common.h       |  1 -
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 13 +++-
 .../pubsub/pubsub_api/include/pubsub/publisher.h   |  5 +-
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  |  1 -
 .../src/pubsub_serializer_impl.c                   | 74 ++++++++++++------
 .../src/pubsub_serializer_impl.h                   |  2 +-
 bundles/pubsub/pubsub_spi/CMakeLists.txt           |  2 +-
 bundles/pubsub/pubsub_spi/include/pubsub_utils.h   |  2 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils.c       | 20 ++---
 27 files changed, 197 insertions(+), 161 deletions(-)

diff --git a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
index 5d9504a..32870e8 100644
--- a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
@@ -4,6 +4,7 @@ name=poi2
 version=1.0.0
 :annotations
 classname=org.example.PointOfInterest
+msgId=5555
 :types
 location={DD lat lon}
 :message
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
index 46d6cb7..113cebb 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
+++ b/bundles/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h
@@ -31,21 +31,21 @@ struct pubsub_sender {
     long bundleId;
     bool stop;
 };
+typedef struct pubsub_sender pubsub_sender_t;
 
-typedef struct pubsub_sender * pubsub_sender_pt;
-
-typedef struct send_thread_struct{
-    pubsub_publisher_pt service;
-    pubsub_sender_pt publisher;
+struct send_thread_struct {
+    pubsub_publisher_t *service;
+    pubsub_sender_t *publisher;
     const char *topic;
-} *send_thread_struct_pt;
+};
+typedef struct send_thread_struct send_thread_struct_t;
 
-pubsub_sender_pt publisher_create(array_list_pt trackers, const char* ident,long bundleId);
+pubsub_sender_t* publisher_create(array_list_pt trackers, const char* ident,long bundleId);
 
-void publisher_start(pubsub_sender_pt client);
-void publisher_stop(pubsub_sender_pt client);
+void publisher_start(pubsub_sender_t *client);
+void publisher_stop(pubsub_sender_t *client);
 
-void publisher_destroy(pubsub_sender_pt client);
+void publisher_destroy(pubsub_sender_t *client);
 
 void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props);
 void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props);
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
index 9b24e8d..ef48b9c 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c
@@ -33,19 +33,19 @@ static const char * PUB_TOPICS[] = {
 };
 
 struct publisherActivator {
-    pubsub_sender_pt client;
+    pubsub_sender_t *client;
     array_list_pt trackerList;//List<service_tracker_pt>
 };
 
 static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx) {
     const char *fwUUID = celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
-    if (fwUUID==NULL) {
+    if (fwUUID == NULL) {
         printf("PUBLISHER: Cannot retrieve fwUUID.\n");
         return CELIX_INVALID_BUNDLE_CONTEXT;
     }
 
 
-    bundle_t *bnd = celix_bundleContext_getBundle(ctx);
+    celix_bundle_t *bnd = celix_bundleContext_getBundle(ctx);
     long bundleId = celix_bundle_getId(bnd);
 
     act->trackerList = celix_arrayList_create();
diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 0a726a2..ec0a241 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -37,94 +37,96 @@
 #include "pubsub_publisher_private.h"
 
 static double randCoordinate(double min, double max) {
-
-    double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min))) ;
-
+    double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min)));
     return ret;
-
 }
 
 static void* send_thread(void* arg) {
+    send_thread_struct_t *st_struct = (send_thread_struct_t *) arg;
 
-    send_thread_struct_pt st_struct = (send_thread_struct_pt)arg;
-
-    pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service;
-    pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher;
+    pubsub_publisher_t *publish_svc = st_struct->service;
+    pubsub_sender_t *publisher = st_struct->publisher;
 
     char fwUUID[9];
-    memset(fwUUID,0,9);
-    memcpy(fwUUID,publisher->ident,8);
+    memset(fwUUID, 0, 9);
+    memcpy(fwUUID, publisher->ident, 8);
 
     //poi_t point = calloc(1,sizeof(*point));
-    location_t place = calloc(1,sizeof(*place));
+    location_t place = calloc(1, sizeof(*place));
 
-    char* desc = calloc(64,sizeof(char));
-    snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
+    char *desc = calloc(64, sizeof(char));
+    snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self());
 
-    char* name = calloc(64,sizeof(char));
-    snprintf(name,64,"Bundle#%ld",publisher->bundleId);
+    char *name = calloc(64, sizeof(char));
+    snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
 
     place->name = name;
     place->description = desc;
     place->extra = "extra value";
-    printf("TOPIC : %s\n",st_struct->topic);
+    printf("TOPIC : %s\n", st_struct->topic);
+
     unsigned int msgId = 0;
-    if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0) {
 
-        while (publisher->stop == false) {
-            place->position.lat = randCoordinate(MIN_LAT,MAX_LAT);
-            place->position.lon = randCoordinate(MIN_LON,MAX_LON);
-            int nr_char = (int)randCoordinate(5,100000);
+    while (publisher->stop == false) {
+        if (msgId == 0) {
+            if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle, st_struct->topic, &msgId) != 0) {
+                printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n", MSG_POI_NAME);
+            }
+        }
+
+        if (msgId > 0) {
+            place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
+            place->position.lon = randCoordinate(MIN_LON, MAX_LON);
+            int nr_char = (int) randCoordinate(5, 100000);
             place->data = calloc(nr_char, 1);
-            for (int i = 0; i < (nr_char-1); i++) {
-                place->data[i] = i%10 + '0';
+            for (int i = 0; i < (nr_char - 1); i++) {
+                place->data[i] = i % 10 + '0';
             }
-            place->data[nr_char-1] = '\0';
+            place->data[nr_char - 1] = '\0';
             if (publish_svc->send) {
-                if (publish_svc->send(publish_svc->handle, msgId,place) == 0) {
-                    printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char);
+                if (publish_svc->send(publish_svc->handle, msgId, place) == 0) {
+                    printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic,
+                           place->position.lat, place->position.lon, place->name, place->description, nr_char);
                 }
             } else {
                 printf("No send for %s\n", st_struct->topic);
             }
 
             free(place->data);
-            sleep(2);
         }
-    } else {
-        printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_POI_NAME);
+        sleep(2);
     }
+
     free(place->description);
     free(place->name);
     free(place);
 
     free(st_struct);
 
-
     return NULL;
-
 }
 
-pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
-    pubsub_sender_pt publisher = malloc(sizeof(*publisher));
+pubsub_sender_t* publisher_create(array_list_pt trackers,const char* ident,long bundleId) {
+    pubsub_sender_t *publisher = malloc(sizeof(*publisher));
 
     publisher->trackers = trackers;
     publisher->ident = ident;
     publisher->bundleId = bundleId;
     publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
     publisher->stop = false;
+
     return publisher;
 }
 
-void publisher_start(pubsub_sender_pt client) {
+void publisher_start(pubsub_sender_t *client) {
     printf("PUBLISHER: starting up...\n");
 }
 
-void publisher_stop(pubsub_sender_pt client) {
+void publisher_stop(pubsub_sender_t *client) {
     printf("PUBLISHER: stopping...\n");
 }
 
-void publisher_destroy(pubsub_sender_pt client) {
+void publisher_destroy(pubsub_sender_t *client) {
     hashMap_destroy(client->tid_map, false, false);
     client->trackers = NULL;
     client->ident = NULL;
@@ -132,29 +134,32 @@ void publisher_destroy(pubsub_sender_pt client) {
 }
 
 void publisher_publishSvcAdded(void * handle, void *svc, const celix_properties_t *props) {
-    pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)svc;
-    pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+    pubsub_publisher_t *publish_svc = (pubsub_publisher_t *) svc;
+    pubsub_sender_t *manager = (pubsub_sender_t *) handle;
     manager->stop = false;
-    printf("PUBLISHER: new publish service exported (%s).\n",manager->ident);
-    send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct));
+
+    printf("PUBLISHER: new publish service exported (%s).\n", manager->ident);
+
+    send_thread_struct_t *data = calloc(1, sizeof(*data));
     data->service = publish_svc;
     data->publisher = manager;
     data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, "!ERROR!");
     celix_thread_t *tid = malloc(sizeof(*tid));
-    celixThread_create(tid,NULL,send_thread,(void*)data);
+    celixThread_create(tid, NULL, send_thread, (void*)data);
     hashMap_put(manager->tid_map, publish_svc, tid);
 }
 
 void publisher_publishSvcRemoved(void * handle, void *svc, const celix_properties_t *props) {
-    pubsub_sender_pt manager = (pubsub_sender_pt)handle;
+    pubsub_sender_t *manager = (pubsub_sender_t *) handle;
     celix_thread_t *tid = hashMap_get(manager->tid_map, svc);
     manager->stop = true;
+
 #if defined(__APPLE__) && defined(__MACH__)
     uint64_t threadid;
     pthread_threadid_np(tid->thread, &threadid);
     printf("PUBLISHER: publish service unexporting (%s) %llu!\n",manager->ident, threadid);
 #else
-    printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread);
+    printf("PUBLISHER: publish service unexporting (%s) %li!\n", manager->ident, tid->thread);
 #endif
 
     celixThread_join(*tid,NULL);
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
index 2aa93a6..107325b 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h
@@ -38,12 +38,12 @@ struct pubsub_receiver {
     char *name;
 };
 
-typedef struct pubsub_receiver* pubsub_receiver_pt;
+typedef struct pubsub_receiver pubsub_receiver_t;
 
-pubsub_receiver_pt subscriber_create(char* topics);
-void subscriber_start(pubsub_receiver_pt client);
-void subscriber_stop(pubsub_receiver_pt client);
-void subscriber_destroy(pubsub_receiver_pt client);
+pubsub_receiver_t* subscriber_create(char* topics);
+void subscriber_start(pubsub_receiver_t* client);
+void subscriber_stop(pubsub_receiver_t* client);
+void subscriber_destroy(pubsub_receiver_t* client);
 
 int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release);
 
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
index 24ebf36..315ebc6 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c
@@ -40,7 +40,7 @@ static const char * SUB_TOPICS[] = {
 
 struct subscriberActivator {
     array_list_pt registrationList; //List<service_registration_pt>
-    pubsub_subscriber_pt subsvc;
+    pubsub_subscriber_t *subsvc;
 };
 
 celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
@@ -53,8 +53,8 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
     struct subscriberActivator * act = (struct subscriberActivator *) userData;
 
-    pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc));
-    pubsub_receiver_pt sub = subscriber_create(SUB_NAME);
+    pubsub_subscriber_t *subsvc = calloc(1,sizeof(*subsvc));
+    pubsub_receiver_t *sub = subscriber_create(SUB_NAME);
     subsvc->handle = sub;
     subsvc->receive = pubsub_subscriber_recv;
 
@@ -76,7 +76,7 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
         arrayList_add(act->registrationList,reg);
     }
 
-    subscriber_start((pubsub_receiver_pt)act->subsvc->handle);
+    subscriber_start((pubsub_receiver_t *) act->subsvc->handle);
 
     return CELIX_SUCCESS;
 }
@@ -91,7 +91,7 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context)
 
     }
 
-    subscriber_stop((pubsub_receiver_pt)act->subsvc->handle);
+    subscriber_stop((pubsub_receiver_t *) act->subsvc->handle);
 
     return CELIX_SUCCESS;
 }
@@ -101,7 +101,7 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex
     struct subscriberActivator * act = (struct subscriberActivator *) userData;
 
     act->subsvc->receive = NULL;
-    subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle);
+    subscriber_destroy((pubsub_receiver_t *) act->subsvc->handle);
     act->subsvc->handle = NULL;
     free(act->subsvc);
     act->subsvc = NULL;
diff --git a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
index 2303fea..2474c50 100644
--- a/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
+++ b/bundles/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c
@@ -30,22 +30,22 @@
 #include "poi.h"
 #include "pubsub_subscriber_private.h"
 
-pubsub_receiver_pt subscriber_create(char* topics) {
-    pubsub_receiver_pt sub = calloc(1,sizeof(*sub));
+pubsub_receiver_t* subscriber_create(char* topics) {
+    pubsub_receiver_t *sub = calloc(1,sizeof(*sub));
     sub->name = strdup(topics);
     return sub;
 }
 
 
-void subscriber_start(pubsub_receiver_pt subscriber) {
+void subscriber_start(pubsub_receiver_t *subscriber) {
     printf("Subscriber started...\n");
 }
 
-void subscriber_stop(pubsub_receiver_pt subscriber) {
+void subscriber_stop(pubsub_receiver_t *subscriber) {
     printf("Subscriber stopped...\n");
 }
 
-void subscriber_destroy(pubsub_receiver_pt subscriber) {
+void subscriber_destroy(pubsub_receiver_t *subscriber) {
     if (subscriber->name != NULL) {
         free(subscriber->name);
     }
@@ -54,10 +54,8 @@ void subscriber_destroy(pubsub_receiver_pt subscriber) {
 }
 
 int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg, bool* release) {
-
     location_t place = (location_t)msg;
     printf("Recv (%s): [%f, %f] (%s, %s, %s, len data %li)\n", msgType, place->position.lat, place->position.lon, place->name, place->description, place->extra, (long)(strlen(place->data) + 1));
 
     return 0;
-
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
index 28d7f74..fcd45cf 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
@@ -17,18 +17,16 @@
  * under the License.
  */
 
+#include "pubsub_tcp_common.h"
+
 #include <memory.h>
 #include <assert.h>
 #include <string.h>
 #include <stdio.h>
+#include <stdint.h>
 #include <unistd.h>
-#include "pubsub_psa_tcp_constants.h"
-#include "pubsub_tcp_common.h"
 
-int psa_tcp_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
-    *msgTypeId = utils_stringHash(msgType);
-    return 0;
-}
+#include "pubsub_psa_tcp_constants.h"
 
 bool psa_tcp_checkVersion(version_pt msgVersion, const pubsub_tcp_msg_header_t *hdr) {
     bool check=false;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index b6f4d5a..feb97ca 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -46,7 +46,6 @@ typedef struct pubsub_tcp_endPointStore{
  */
 
 
-int psa_tcp_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 void psa_tcp_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
 bool psa_tcp_checkVersion(version_pt msgVersion, const pubsub_tcp_msg_header_t *hdr);
 void psa_tcp_setupTcpContext(log_helper_t *logHelper, celix_thread_t *thread, const celix_properties_t *topicProperties);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 79eaca1..83ff981 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -101,22 +101,17 @@ typedef struct psa_tcp_bounded_service_entry {
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+    hash_map_t *msgTypeIds; // key = msg name, value = msg type id
     hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
     int getCount;
 } psa_tcp_bounded_service_entry_t;
 
-static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
-                                         const celix_properties_t *svcProperties);
-
-static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle,
-                                          const celix_properties_t *svcProperties);
-
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId);
+static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
+static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static unsigned int rand_range(unsigned int min, unsigned int max);
-
 static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender);
-
 static void *psa_tcp_sendThread(void *data);
-
 static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg);
 
 pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
@@ -326,6 +321,12 @@ void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, con
     //TODO
 }
 
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+    psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle;
+    *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds, msgType);
+    return 0;
+}
+
 static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
                                          const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_tcp_topic_sender_t *sender = handle;
@@ -341,9 +342,9 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req
         entry->parent = sender;
         entry->bndId = bndId;
         entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+        entry->msgTypeIds = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t *) requestingBundle,
-                                                         &entry->msgTypes);
+        int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t *) requestingBundle, &entry->msgTypes);
         if (rc == 0) {
             hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
             while (hashMapIterator_hasNext(&iter)) {
@@ -361,6 +362,7 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req
                 uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
                 celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
                 hashMap_put(entry->msgEntries, key, sendEntry);
+                hashMap_put(entry->msgTypeIds, strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) sendEntry->msgSer->msgId);
             }
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType;
@@ -400,6 +402,8 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re
             free(msgEntry);
         }
         hashMap_destroy(entry->msgEntries, false, false);
+
+        hashMap_destroy(entry->msgTypeIds, true, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
index e75599d..7073b4f 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.c
@@ -19,11 +19,6 @@
 
 #include "pubsub_udpmc_common.h"
 
-int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
-    *msgTypeId = utils_stringHash(msgType);
-    return 0;
-}
-
 bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t *hdr) {
     bool check = false;
 
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
index 7528f4e..534a2ed 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_common.h
@@ -31,8 +31,6 @@ typedef struct pubsub_udp_msg_header {
 } pubsub_udp_msg_header_t;
 
 
-int psa_udpmc_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId);
-
 bool psa_udpmc_checkVersion(version_pt msgVersion, pubsub_udp_msg_header_t *hdr);
 
 
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
index fd46062..e26fc31 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c
@@ -458,8 +458,8 @@ static void psa_udpmc_processMsg(pubsub_udpmc_topic_receiver_t *receiver, pubsub
 
             } else {
                 int major = 0, minor = 0;
-                version_getMajor(msgSer->msgVersion,&major);
-                version_getMinor(msgSer->msgVersion,&minor);
+                version_getMajor(msgSer->msgVersion, &major);
+                version_getMinor(msgSer->msgVersion, &minor);
                 printf("[PSA_UDPMC] Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
                        msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
             }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 12dfc30..1023005 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@ -66,6 +66,7 @@ typedef struct psa_udpmc_bounded_service_entry {
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes;
+    hash_map_t *msgTypeIds;
     int getCount;
     largeUdp_pt largeUdpHandle;
 } psa_udpmc_bounded_service_entry_t;
@@ -76,6 +77,7 @@ typedef struct pubsub_msg {
     char *payload;
 } pubsub_udp_msg_t;
 
+static int psa_udpmc_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static int psa_udpmc_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg);
@@ -192,6 +194,12 @@ void pubsub_udpmcTopicSender_disconnectFrom(pubsub_udpmc_topic_sender_t *sender,
     //TODO
 }
 
+static int psa_udpmc_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+    psa_udpmc_bounded_service_entry_t *entry = (psa_udpmc_bounded_service_entry_t *) handle;
+    *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds, msgType);
+    return 0;
+}
+
 static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_udpmc_topic_sender_t *sender = handle;
     long bndId = celix_bundle_getId(requestingBundle);
@@ -208,9 +216,16 @@ static void* psa_udpmc_getPublisherService(void *handle, const celix_bundle_t *r
         entry->parent = sender;
         entry->bndId = bndId;
         entry->largeUdpHandle = largeUdp_create(1);
+        entry->msgTypeIds = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
         if (rc == 0) {
+            hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                pubsub_msg_serializer_t *msgSer  = hashMapIterator_nextValue(&iter);
+                hashMap_put(entry->msgTypeIds, strndup(msgSer->msgName, 1024), (void *)(uintptr_t) msgSer->msgId);
+            }
+
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_udpmc_localMsgTypeIdForMsgType;
             entry->service.send = psa_udpmc_topicPublicationSend;
@@ -244,6 +259,7 @@ static void psa_udpmc_ungetPublisherService(void *handle, const celix_bundle_t *
             fprintf(stderr, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n");
         }
 
+        hashMap_destroy(entry->msgTypeIds, true, false);
         largeUdp_destroy(entry->largeUdpHandle);
         free(entry);
     }
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
index ce85e40..8273c10 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.c
@@ -22,11 +22,6 @@
 #include <stdio.h>
 #include "pubsub_websocket_common.h"
 
-int psa_websocket_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
-    *msgTypeId = utils_stringHash(msgType);
-    return 0;
-}
-
 bool psa_websocket_checkVersion(version_pt msgVersion, const pubsub_websocket_msg_header_t *hdr) {
     bool check=false;
     int major=0,minor=0;
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
index 89ec65a..6ed6c83 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -46,7 +46,6 @@ struct pubsub_websocket_msg {
 
 typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
 
-int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 void psa_websocket_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
 char *psa_websocket_createURI(const char *scope, const char *topic);
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
index 42f66c7..4f80829 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -93,11 +93,12 @@ typedef struct psa_websocket_bounded_service_entry {
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+    hash_map_t *msgTypeIds; //key = msg name, value = msg type id
     hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t
     int getCount;
 } psa_websocket_bounded_service_entry_t;
 
-
+static int psa_websocket_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId);
 static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties);
 static void delay_first_send_for_late_joiners(pubsub_websocket_topic_sender_t *sender);
@@ -229,6 +230,11 @@ const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
     return sender->uri;
 }
 
+static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
+    psa_websocket_bounded_service_entry_t *entry = (psa_websocket_bounded_service_entry_t *) handle;
+    *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds, msgType);
+    return 0;
+}
 
 static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_websocket_topic_sender_t *sender = handle;
@@ -244,6 +250,7 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_
         entry->parent = sender;
         entry->bndId = bndId;
         entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+        entry->msgTypeIds = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
         if (rc == 0) {
@@ -263,6 +270,7 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_
                 uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
                 celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
                 hashMap_put(entry->msgEntries, key, sendEntry);
+                hashMap_put(entry->msgTypeIds, strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) sendEntry->msgSer->msgId);
             }
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType;
@@ -302,6 +310,7 @@ static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle
         }
         hashMap_destroy(entry->msgEntries, false, false);
 
+        hashMap_destroy(entry->msgTypeIds, true, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
index 145798b..4608296 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.c
@@ -21,11 +21,6 @@
 #include <assert.h>
 #include "pubsub_zmq_common.h"
 
-int psa_zmq_localMsgTypeIdForMsgType(void* handle __attribute__((unused)), const char* msgType, unsigned int* msgTypeId) {
-    *msgTypeId = utils_stringHash(msgType);
-    return 0;
-}
-
 bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr) {
     bool check=false;
     int major=0,minor=0;
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
index 6a72ce9..7918249 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_common.h
@@ -50,7 +50,6 @@ struct pubsub_zmq_msg_header {
 typedef struct pubsub_zmq_msg_header pubsub_zmq_msg_header_t;
 
 
-int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 void psa_zmq_setScopeAndTopicFilter(const char* scope, const char *topic, char *filter);
 
 bool psa_zmq_checkVersion(version_pt msgVersion, const pubsub_zmq_msg_header_t *hdr);
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
index 8166d4c..c7bb7ff 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c
@@ -98,6 +98,7 @@ typedef struct psa_zmq_bounded_service_entry {
     pubsub_publisher_t service;
     long bndId;
     hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+    hash_map_t *msgTypeIds; //key = msg name, value = msg type id
     hash_map_t *msgEntries; //key = msg type id, value = psa_zmq_send_msg_entry_t
     int getCount;
 } psa_zmq_bounded_service_entry_t;
@@ -344,6 +345,12 @@ void pubsub_zmqTopicSender_disconnectFrom(pubsub_zmq_topic_sender_t *sender __at
     /*nop*/
 }
 
+static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
+    psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
+    *msgTypeId = (unsigned int)(uintptr_t) hashMap_get(entry->msgTypeIds, msgType);
+    return 0;
+}
+
 static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
     pubsub_zmq_topic_sender_t *sender = handle;
     long bndId = celix_bundle_getId(requestingBundle);
@@ -358,6 +365,7 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
         entry->parent = sender;
         entry->bndId = bndId;
         entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+        entry->msgTypeIds = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
         int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes);
         if (rc == 0) {
@@ -377,6 +385,7 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
                 uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
                 celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
                 hashMap_put(entry->msgEntries, key, sendEntry);
+                hashMap_put(entry->msgTypeIds, strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) sendEntry->msgSer->msgId);
             }
             entry->service.handle = entry;
             entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
@@ -385,9 +394,6 @@ static void* psa_zmq_getPublisherService(void *handle, const celix_bundle_t *req
         } else {
             L_ERROR("Error creating serializer map for ZMQ TopicSender %s/%s", sender->scope, sender->topic);
         }
-
-
-
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 
@@ -419,6 +425,7 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re
         }
         hashMap_destroy(entry->msgEntries, false, false);
 
+        hashMap_destroy(entry->msgTypeIds, true, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
index 96483d8..18eaf56 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -43,7 +43,7 @@
  
 struct pubsub_publisher {
     void *handle;
- 
+
     /**
      * Every msg is identifiable by msg type string. Because msg type string are performance wise not preferable (string compares),
      * a "local" (int / platform dependent) unique id will be generated runtime
@@ -54,7 +54,7 @@ struct pubsub_publisher {
      * Returns 0 on success.
      */
     int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, unsigned int *msgTypeId);
-  
+
     /**
      * send is a async function, but the msg can be safely deleted after send returns.
      * Returns 0 on success.
@@ -63,6 +63,5 @@ struct pubsub_publisher {
  
 };
 typedef struct pubsub_publisher pubsub_publisher_t;
-typedef struct pubsub_publisher* pubsub_publisher_pt;
 
 #endif // __PUBSUB_PUBLISHER_H_
diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
index 3debad9..7daf5d2 100644
--- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
+++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -66,7 +66,6 @@ struct pubsub_subscriber_struct {
 
 };
 typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
-typedef struct pubsub_subscriber_struct* pubsub_subscriber_pt;
 
 
 #endif //  __PUBSUB_SUBSCRIBER_H_
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
index 5637937..618f9a4 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.c
@@ -67,12 +67,12 @@ typedef struct pubsub_json_msg_serializer_impl {
     version_pt msgVersion;
 } pubsub_json_msg_serializer_impl_t;
 
-static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle);
-static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
-static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+static char* pubsubSerializer_getMsgDescriptionDir(celix_bundle_t *bundle);
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_bundle_t *bundle, hash_map_pt msgSerializers);
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers,celix_bundle_t *bundle);
 
-static int pubsubMsgSerializer_convert_descriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer);
-static int pubsubMsgSerializer_convert_avpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn);
+static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer);
+static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn);
 
 static void dfi_log(void *handle, int level, const char *file, int line, const char *msg, ...) {
     va_list ap;
@@ -121,7 +121,7 @@ celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer) {
     return status;
 }
 
-celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap) {
+celix_status_t pubsubSerializer_createSerializerMap(void *handle, celix_bundle_t *bundle, hash_map_pt* serializerMap) {
     celix_status_t status = CELIX_SUCCESS;
     pubsub_json_serializer_t *serializer = handle;
 
@@ -207,7 +207,7 @@ void pubsubMsgSerializer_freeMsg(void* handle, void *msg) {
 }
 
 
-static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
+static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, celix_bundle_t *bundle) {
     char* root = NULL;
     char* metaInfPath = NULL;
 
@@ -224,8 +224,7 @@ static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bu
     }
 }
 
-static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
-{
+static char* pubsubSerializer_getMsgDescriptionDir(celix_bundle_t *bundle) {
     char *root = NULL;
 
     bool isSystemBundle = false;
@@ -251,9 +250,7 @@ static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
     return root;
 }
 
-
-static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
-{
+static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, celix_bundle_t *bundle, hash_map_pt msgSerializers) {
     char fqn[MAX_PATH_LEN];
     char path[MAX_PATH_LEN];
     const char* entry_name = NULL;
@@ -282,10 +279,10 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
 
         int translation_result = -1;
         if (fileInputType == FIT_DESCRIPTOR) {
-            translation_result = pubsubMsgSerializer_convert_descriptor(stream, msgSerializer);
+            translation_result = pubsubMsgSerializer_convertDescriptor(stream, msgSerializer);
         }
         else if (fileInputType == FIT_AVPR) {
-            translation_result = pubsubMsgSerializer_convert_avpr(stream, msgSerializer, fqn);
+            translation_result = pubsubMsgSerializer_convertAvpr(stream, msgSerializer, fqn);
         }
         fclose(stream);
 
@@ -317,6 +314,7 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
         closedir(dir);
     }
 }
+
 static FILE* openFileStream(FILE_INPUT_TYPE file_input_type, const char* filename, const char* root, char* avpr_fqn, char* path) {
     FILE* result = NULL;
     memset(path, 0, MAX_PATH_LEN);
@@ -394,15 +392,15 @@ static bool readPropertiesFile(const char* properties_file_name, const char* roo
     return true;
 }
 
-static int pubsubMsgSerializer_convert_descriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer) {
-    dyn_message_type* msgType = NULL;
+static int pubsubMsgSerializer_convertDescriptor(FILE* file_ptr, pubsub_msg_serializer_t* serializer) {
+    dyn_message_type *msgType = NULL;
     int rc = dynMessage_parse(file_ptr, &msgType);
     if (rc != 0 || msgType == NULL) {
         printf("DMU: cannot parse message from descriptor.\n");
         return -1;
     }
 
-    char* msgName = NULL;
+    char *msgName = NULL;
     rc += dynMessage_getName(msgType, &msgName);
 
     version_pt msgVersion = NULL;
@@ -413,11 +411,25 @@ static int pubsubMsgSerializer_convert_descriptor(FILE* file_ptr, pubsub_msg_ser
         return -1;
     }
 
-    dyn_type * type = NULL;
+    dyn_type *type = NULL;
     dynMessage_getMessageType(msgType, &type);
 
-    unsigned int msgId = utils_stringHash(msgName);
-    pubsub_json_msg_serializer_impl_t * handle = (pubsub_json_msg_serializer_impl_t*)serializer->handle;
+    unsigned int msgId = 0;
+
+    char *msgIdStr = NULL;
+    int rv = dynMessage_getAnnotationEntry(msgType, "msgId", &msgIdStr);
+    if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+        // custom msg id passed, use it
+        long customMsgId = strtol(msgIdStr, NULL, 10);
+        if (customMsgId > 0)
+            msgId = (unsigned int) customMsgId;
+    }
+
+    if (msgId == 0) {
+        msgId = utils_stringHash(msgName);
+    }
+
+    pubsub_json_msg_serializer_impl_t *handle = (pubsub_json_msg_serializer_impl_t*)serializer->handle;
     handle->type = type;
     handle->msgId = msgId;
     handle->msgName = msgName;
@@ -434,16 +446,16 @@ static int pubsubMsgSerializer_convert_descriptor(FILE* file_ptr, pubsub_msg_ser
     return 0;
 }
 
-static int pubsubMsgSerializer_convert_avpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn) {
+static int pubsubMsgSerializer_convertAvpr(FILE* file_ptr, pubsub_msg_serializer_t* serializer, const char* fqn) {
     if (!file_ptr || !fqn || !serializer) return -2;
-    dyn_type* type = dynType_parseAvpr(file_ptr, fqn);
+    dyn_type *type = dynType_parseAvpr(file_ptr, fqn);
 
     if (!type) {
         printf("DMU: cannot parse avpr file for '%s'\n", fqn);
         return -1;
     }
 
-    const char* msgName = dynType_getName(type);
+    const char *msgName = dynType_getName(type);
 
     version_pt msgVersion = NULL;
     celix_status_t s = version_createVersionFromString(dynType_getMetaInfo(type, "version"), &msgVersion);
@@ -456,8 +468,20 @@ static int pubsubMsgSerializer_convert_avpr(FILE* file_ptr, pubsub_msg_serialize
         return -1;
     }
 
-    unsigned int msgId = utils_stringHash(msgName);
-    pubsub_json_msg_serializer_impl_t * handle = (pubsub_json_msg_serializer_impl_t*) serializer->handle;
+    unsigned int msgId = 0;
+    const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+    if (msgIdStr != NULL) {
+        // custom msg id passed, use it
+        long customMsgId = strtol(msgIdStr, NULL, 10);
+        if (customMsgId > 0)
+            msgId = (unsigned int) customMsgId;
+    }
+
+    if (msgId == 0) {
+        msgId = utils_stringHash(msgName);
+    }
+
+    pubsub_json_msg_serializer_impl_t *handle = (pubsub_json_msg_serializer_impl_t*) serializer->handle;
     handle->type = type;
     handle->msgId = msgId;
     handle->msgName = msgName;
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
index f44f6b2..9a850f2 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_serializer_impl.h
@@ -34,7 +34,7 @@ typedef struct pubsub_json_serializer pubsub_json_serializer_t;
 celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_json_serializer_t **serializer);
 celix_status_t pubsubSerializer_destroy(pubsub_json_serializer_t* serializer);
 
-celix_status_t pubsubSerializer_createSerializerMap(void *handle, bundle_pt bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_createSerializerMap(void *handle, celix_bundle_t *bundle, hash_map_pt* serializerMap);
 celix_status_t pubsubSerializer_destroySerializerMap(void *handle, hash_map_pt serializerMap);
 
 #endif /* PUBSUB_SERIALIZER_JSON_H_ */
diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt
index 509c760..ffb9625 100644
--- a/bundles/pubsub/pubsub_spi/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt
@@ -27,7 +27,7 @@ add_library(pubsub_spi STATIC
         src/pubsub_utils_match.c
         src/pubsub_endpoint.c
         src/pubsub_utils.c
-            src/pubsub_admin_metrics.c
+        src/pubsub_admin_metrics.c
 )
 target_include_directories(pubsub_spi SYSTEM PRIVATE ${UUID_INCLUDE_DIRS})
 set_target_properties(pubsub_spi PROPERTIES OUTPUT_NAME "celix_pubsub_spi")
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index 7c80745..39c1841 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -40,7 +40,7 @@ extern "C" {
  */
 celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topic, char **scope);
 
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx);
 
 double pubsub_utils_matchPublisher(
         celix_bundle_context_t *ctx,
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
index b6ed882..4689076 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -89,8 +89,7 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi
  *
  * Caller is responsible for freeing the object
  */
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
-{
+char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx) {
     array_list_pt bundles = NULL;
     bundleContext_getBundles(ctx, &bundles);
     int nrOfBundles = arrayList_size(bundles);
@@ -98,7 +97,7 @@ char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
     char* result = NULL;
 
     for (int i = 0; i < nrOfBundles; i++) {
-        bundle_pt b = arrayList_get(bundles, i);
+        celix_bundle_t *b = arrayList_get(bundles, i);
 
         /* Skip bundle 0 (framework bundle) since it has no path nor revisions */
         bundle_getBundleId(b, &bundle_id);
@@ -130,23 +129,20 @@ char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
 }
 
 celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher) {
-
     celix_properties_t *topic_props = NULL;
 
     bool isSystemBundle = false;
-    bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
+    bundle_isSystemBundle((celix_bundle_t *)bundle, &isSystemBundle);
     long bundleId = -1;
-    bundle_isSystemBundle((bundle_pt)bundle, &isSystemBundle);
-    bundle_getBundleId((bundle_pt)bundle,&bundleId);
+    bundle_isSystemBundle((celix_bundle_t *)bundle, &isSystemBundle);
+    bundle_getBundleId((celix_bundle_t *)bundle,&bundleId);
 
     if (isSystemBundle == false) {
-
         char *bundleRoot = NULL;
-        char* topicPropertiesPath = NULL;
-        bundle_getEntry((bundle_pt)bundle, ".", &bundleRoot);
+        char *topicPropertiesPath = NULL;
+        bundle_getEntry((celix_bundle_t *)bundle, ".", &bundleRoot);
 
         if (bundleRoot != NULL) {
-
             asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher? "pub":"sub", topic);
             topic_props = celix_properties_load(topicPropertiesPath);
             if (topic_props == NULL) {
@@ -159,4 +155,4 @@ celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t *bundle
     }
 
     return topic_props;
-}
\ No newline at end of file
+}


Mime
View raw message