celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [celix] 02/02: Adds a stop service export thread for rsa to prevent deadlocks
Date Tue, 08 Sep 2020 18:50:13 GMT
This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch hotfix/rsa_deadlock
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 5bf1426256ea53a6cc68c9a861c37cb62682d08c
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
AuthorDate: Tue Sep 8 20:49:12 2020 +0200

    Adds a stop service export thread for rsa to prevent deadlocks
---
 .../src/export_registration_dfi.c                  | 24 +++---
 .../src/export_registration_dfi.h                  |  2 +-
 .../src/remote_service_admin_dfi.c                 | 94 +++++++++++++++++++++-
 3 files changed, 104 insertions(+), 16 deletions(-)

diff --git a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c
b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c
index 39b43b3..5afa1a3 100644
--- a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c
+++ b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.c
@@ -19,7 +19,6 @@
 
 #include <jansson.h>
 #include <dyn_interface.h>
-#include <json_serializer.h>
 #include <remote_constants.h>
 #include <remote_service_admin.h>
 #include <service_tracker_customizer.h>
@@ -44,6 +43,7 @@ struct export_registration {
 
 
     celix_thread_mutex_t mutex;
+    bool active; //protected by mutex
     void *service; //protected by mutex
     long trackerId; //protected by mutex
 
@@ -86,6 +86,7 @@ celix_status_t exportRegistration_create(celix_log_helper_t *helper, service_ref
         reg->logFile = logFile;
         reg->servId = strndup(servId, 1024);
         reg->trackerId = -1L;
+        reg->active = true;
 
         remoteInterceptorsHandler_create(context, &reg->interceptorsHandler);
 
@@ -139,8 +140,11 @@ celix_status_t exportRegistration_call(export_registration_t *export,
char *data
             bool cont = remoteInterceptorHandler_invokePreExportCall(export->interceptorsHandler,
export->exportReference.endpoint->properties, sig, &metadata);
             if (cont) {
                 celixThreadMutex_lock(&export->mutex);
-                if (export->service != NULL) {
+                if (export->active && export->service != NULL) {
                     status = jsonRpc_call(export->intf, export->service, data, responseOut);
+                } else if (!export->active) {
+                    status = CELIX_ILLEGAL_STATE;
+                    celix_logHelper_warning(export->helper, "Cannot call an inactive service
export");
                 } else {
                     status = CELIX_ILLEGAL_STATE;
                     celix_logHelper_error(export->helper, "export service pointer is NULL");
@@ -245,7 +249,7 @@ celix_status_t exportRegistration_start(export_registration_t *reg) {
     celixThreadMutex_unlock(&reg->mutex);
 
     if (prevTrkId >= 0) {
-        celix_logHelper_error(reg->helper, "Error staring export registration. The export
registration already has an active service tracker");
+        celix_logHelper_error(reg->helper, "Error starting export registration. The export
registration already had an active service tracker");
         celix_bundleContext_stopTracker(reg->context, prevTrkId);
     }
 
@@ -268,6 +272,12 @@ celix_status_t exportRegistration_stop(export_registration_t *reg) {
     return status;
 }
 
+void exportRegistration_setActive(export_registration_t *reg, bool active) {
+    celixThreadMutex_lock(&reg->mutex);
+    reg->active = active;
+    celixThreadMutex_unlock(&reg->mutex);
+}
+
 static void exportRegistration_addServ(void *data, void *service) {
     export_registration_t *reg = data;
     celixThreadMutex_lock(&reg->mutex);
@@ -284,14 +294,6 @@ static void exportRegistration_removeServ(void *data, void *service)
{
     celixThreadMutex_unlock(&reg->mutex);
 }
 
-
-celix_status_t exportRegistration_close(export_registration_t *reg) {
-    celix_status_t status = CELIX_SUCCESS;
-    exportRegistration_stop(reg);
-    return status;
-}
-
-
 celix_status_t exportRegistration_getException(export_registration_t *registration) {
     celix_status_t status = CELIX_SUCCESS;
     //TODO
diff --git a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h
b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h
index 2333c4c..8bc40eb 100644
--- a/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h
+++ b/bundles/remote_services/remote_service_admin_dfi/src/export_registration_dfi.h
@@ -26,11 +26,11 @@
 #include "endpoint_description.h"
 
 celix_status_t exportRegistration_create(celix_log_helper_t *helper, service_reference_pt
reference, endpoint_description_t *endpoint, celix_bundle_context_t *context, FILE *logFile,
export_registration_t **registration);
-celix_status_t exportRegistration_close(export_registration_t *registration);
 void exportRegistration_destroy(export_registration_t *registration);
 
 celix_status_t exportRegistration_start(export_registration_t *registration);
 celix_status_t exportRegistration_stop(export_registration_t *registration);
+void exportRegistration_setActive(export_registration_t *registration, bool active);
 
 celix_status_t exportRegistration_call(export_registration_t *export, char *data, int datalength,
celix_properties_t *metadata, char **response, int *responseLength);
 
diff --git a/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c
b/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c
index f690a31..53620df 100644
--- a/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c
+++ b/bundles/remote_services/remote_service_admin_dfi/src/remote_service_admin_dfi.c
@@ -19,6 +19,7 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <unistd.h>
 
 #include <arpa/inet.h>
 #include <netdb.h>
@@ -56,6 +57,16 @@
 #define RSA_LOG_DEBUG(admin, msg, ...) \
     celix_logHelper_log((admin)->loghelper, CELIX_LOG_LEVEL_ERROR, (msg),  ##__VA_ARGS__)
 
+
+/**
+ * If set to true the rsa will create a thread to handle stopping of service export.
+ *
+ * This stop thread can be removed when the branch feature/async_svc_registration is merged
and
+ * celix_bundleContext_stopTrackerAsync is available.
+ *
+ */
+#define CELIX_RSA_USE_STOP_EXPORT_THREAD false
+
 struct remote_service_admin {
     celix_bundle_context_t *context;
     celix_log_helper_t *loghelper;
@@ -63,6 +74,13 @@ struct remote_service_admin {
     celix_thread_rwlock_t exportedServicesLock;
     hash_map_pt exportedServices;
 
+    //NOTE stopExportsMutex, stopExports, stopExportsActive, stopExportsCond and stopExportsThread
are only used if CELIX_RSA_USE_STOP_EXPORT_THREAD is set to true
+    celix_thread_mutex_t stopExportsMutex;
+    celix_array_list_t *stopExports;
+    bool stopExportsActive;
+    celix_thread_cond_t stopExportsCond;
+    celix_thread_t stopExportsThread;
+
     celix_thread_mutex_t importedServicesLock;
     array_list_pt importedServices;
 
@@ -110,6 +128,8 @@ static celix_status_t remoteServiceAdmin_getIpAddress(char* interface,
char** ip
 static size_t remoteServiceAdmin_readCallback(void *ptr, size_t size, size_t nmemb, void
*userp);
 static size_t remoteServiceAdmin_write(void *contents, size_t size, size_t nmemb, void *userp);
 static void remoteServiceAdmin_log(remote_service_admin_t *admin, int level, const char *file,
int line, const char *msg, ...);
+static void remoteServiceAdmin_setupStopExportsThread(remote_service_admin_t* admin);
+static void remoteServiceAdmin_teardownStopExportsThread(remote_service_admin_t* admin);
 
 static void remoteServiceAdmin_curlshare_lock(CURL *handle, curl_lock_data data, curl_lock_access
laccess, void *userptr)
 {
@@ -220,6 +240,8 @@ celix_status_t remoteServiceAdmin_create(celix_bundle_context_t *context,
remote
             status = EPERM;
         }
 
+        remoteServiceAdmin_setupStopExportsThread(*admin);
+
         // Prepare callbacks structure. We have only one callback, the rest are NULL.
         struct mg_callbacks callbacks;
         memset(&callbacks, 0, sizeof(callbacks));
@@ -285,6 +307,52 @@ celix_status_t remoteServiceAdmin_destroy(remote_service_admin_t **admin)
     return status;
 }
 
+void* remoteServiceAdmin_stopExportsThread(void *data) {
+    remote_service_admin_t* admin = data;
+    bool active = true;
+
+    while (active) {
+        celixThreadMutex_lock(&admin->stopExportsMutex);
+        if (admin->stopExportsActive && celix_arrayList_size(admin->stopExports)
== 0) {
+            celixThreadCondition_timedwaitRelative(&admin->stopExportsCond, &admin->stopExportsMutex,
1, 0);
+        }
+        for (int i = 0; i < celix_arrayList_size(admin->stopExports); ++i) {
+            export_registration_t *export = celix_arrayList_get(admin->stopExports, i);
+            exportRegistration_stop(export);
+            exportRegistration_destroy(export);
+        }
+        celix_arrayList_clear(admin->stopExports);
+        active = admin->stopExportsActive;
+        celixThreadMutex_unlock(&admin->stopExportsMutex);
+    }
+
+    return NULL;
+}
+
+static void remoteServiceAdmin_setupStopExportsThread(remote_service_admin_t* admin) {
+    if (CELIX_RSA_USE_STOP_EXPORT_THREAD) {
+        //setup exports stop thread
+        celixThreadMutex_create(&admin->stopExportsMutex, NULL);
+        admin->stopExports = celix_arrayList_create();
+        celixThreadCondition_init(&admin->stopExportsCond, NULL);
+        admin->stopExportsActive = true;
+        celixThread_create(&admin->stopExportsThread, NULL, remoteServiceAdmin_stopExportsThread,
admin);
+    }
+}
+
+static void remoteServiceAdmin_teardownStopExportsThread(remote_service_admin_t* admin) {
+    if (CELIX_RSA_USE_STOP_EXPORT_THREAD) {
+        celixThreadMutex_lock(&admin->stopExportsMutex);
+        admin->stopExportsActive = false;
+        celixThreadCondition_broadcast(&admin->stopExportsCond);
+        celixThreadMutex_unlock(&admin->stopExportsMutex);
+        celixThread_join(admin->stopExportsThread, NULL);
+        celix_arrayList_destroy(admin->stopExports);
+        celixThreadMutex_destroy(&admin->stopExportsMutex);
+        celixThreadCondition_destroy(&admin->stopExportsCond);
+    }
+}
+
 
 celix_status_t remoteServiceAdmin_stop(remote_service_admin_t *admin) {
     celix_status_t status = CELIX_SUCCESS;
@@ -298,8 +366,16 @@ celix_status_t remoteServiceAdmin_stop(remote_service_admin_t *admin)
{
         for (i = 0; i < arrayList_size(exports); i++) {
             export_registration_t *export = arrayList_get(exports, i);
             if (export != NULL) {
-                exportRegistration_stop(export);
-                exportRegistration_destroy(export);
+                if (CELIX_RSA_USE_STOP_EXPORT_THREAD) {
+                    celixThreadMutex_lock(&admin->stopExportsMutex);
+                    exportRegistration_setActive(export, false);
+                    celix_arrayList_add(admin->stopExports, export);
+                    celixThreadCondition_broadcast(&admin->stopExportsCond);
+                    celixThreadMutex_unlock(&admin->stopExportsMutex);
+                } else {
+                    exportRegistration_stop(export);
+                    exportRegistration_destroy(export);
+                }
             }
         }
         arrayList_destroy(exports);
@@ -307,6 +383,8 @@ celix_status_t remoteServiceAdmin_stop(remote_service_admin_t *admin)
{
     hashMapIterator_destroy(iter);
     celixThreadRwlock_unlock(&admin->exportedServicesLock);
 
+    remoteServiceAdmin_teardownStopExportsThread(admin);
+
     celixThreadMutex_lock(&admin->importedServicesLock);
     int i;
     int size = arrayList_size(admin->importedServices);
@@ -555,8 +633,16 @@ celix_status_t remoteServiceAdmin_removeExportedService(remote_service_admin_t
*
             arrayList_destroy(exports);
         }
 
-        exportRegistration_close(registration);
-        exportRegistration_destroy(registration);
+        if (CELIX_RSA_USE_STOP_EXPORT_THREAD) {
+            celixThreadMutex_lock(&admin->stopExportsMutex);
+            exportRegistration_setActive(registration, false);
+            celix_arrayList_add(admin->stopExports, registration);
+            celixThreadCondition_broadcast(&admin->stopExportsCond);
+            celixThreadMutex_unlock(&admin->stopExportsMutex);
+        } else {
+            exportRegistration_stop(registration);
+            exportRegistration_destroy(registration);
+        }
 
         celixThreadRwlock_unlock(&admin->exportedServicesLock);
 


Mime
View raw message