stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From isu...@apache.org
Subject [2/4] initial changes for hierarchical topology locking
Date Tue, 07 Oct 2014 13:40:31 GMT
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index 67c9b67..4473add 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberSuspendedMessageProcessor extends MessageProcessor {
@@ -54,96 +55,108 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(message, MemberSuspendedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberSuspendedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.Suspended) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.Suspended);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.Suspended) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already suspended: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Suspended);
 
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member suspended: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
index 5b5cbc9..3619b53 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -29,6 +29,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberTerminatedMessageProcessor extends MessageProcessor {
@@ -53,87 +54,99 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(message, MemberTerminatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberTerminatedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if(member != null) {
-                // Apply member filter
-                if(TopologyMemberFilter.getInstance().isActive()) {
-                    if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                        }
-                        return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if(member != null) {
+            // Apply member filter
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                     }
+                    return false;
                 }
             }
+        }
 
-            // Notify event listeners before removing member object
-            notifyEventListeners(event);
+        // Notify event listeners before removing member object
+        notifyEventListeners(event);
 
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	// Remove member from the cluster
-            	cluster.removeMember(member);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
-
-            return true;
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            // Remove member from the cluster
+            cluster.removeMember(member);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member terminated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
index 2c216f0..1c4be8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ServiceCreatedMessageProcessor extends MessageProcessor {
@@ -43,44 +44,21 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
 
         if (ServiceCreatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ServiceCreatedEvent event = (ServiceCreatedEvent) Util.jsonToObject(message, ServiceCreatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
+            TopologyManager.acquireWriteLockForServices();
+            try {
+                return doProcess(event, topology);
 
-            // Validate event against the existing topology
-            if (topology.serviceExists(event.getServiceName())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	Service service = new Service(event.getServiceName(), event.getServiceType());
-            	service.addPorts(event.getPorts());
-            	topology.addService(service);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Service created: [service] %s", event.getServiceName()));
-            	}
+            } finally {
+                TopologyManager.releaseWriteLockForServices();
             }
 
-
-            notifyEventListeners(event);
-            return true;
-
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -90,4 +68,40 @@ public class ServiceCreatedMessageProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ServiceCreatedEvent event, Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+        }
+
+        // Validate event against the existing topology
+        if (topology.serviceExists(event.getServiceName())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service already created: [service] %s", event.getServiceName()));
+            }
+        } else {
+
+            // Apply changes to the topology
+            Service service = new Service(event.getServiceName(), event.getServiceType());
+            service.addPorts(event.getPorts());
+            topology.addService(service);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Service created: [service] %s", event.getServiceName()));
+            }
+        }
+
+
+        notifyEventListeners(event);
+        return true;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
index 2c0bc70..a38cbdc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ServiceRemovedMessageProcessor extends MessageProcessor {
@@ -49,38 +50,14 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             ServiceRemovedEvent event = (ServiceRemovedEvent) Util.jsonToObject(message, ServiceRemovedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
-
-            // Notify event listeners before removing service object
-            notifyEventListeners(event);
+            TopologyManager.acquireWriteLockForServices();
+            try {
+                return doProcess(event, topology);
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	topology.removeService(service);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Service removed: [service] %s", event.getServiceName()));
-            	}
+            } finally {
+                TopologyManager.releaseWriteLockForServices();
             }
 
-            return true;
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -90,4 +67,40 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (ServiceRemovedEvent event, Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+        }
+
+        // Notify event listeners before removing service object
+        notifyEventListeners(event);
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+        } else {
+
+            // Apply changes to the topology
+            topology.removeService(service);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Service removed: [service] %s", event.getServiceName()));
+            }
+        }
+
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index e3ddfa3..db9e8b1 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -47,7 +47,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
     private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
     private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
     private GroupActivatedProcessor groupActivatedProcessor;
-    private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
+    //private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor;
     private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
     private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor;
     private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
@@ -109,11 +109,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
         add(applicationActivatedMessageProcessor);
 
         if (log.isDebugEnabled()) {
-            log.debug("Grouping: added applicationCreatedMessageProcessor, applicationRemovedMessageProcessor: " +
-            		applicationCreatedMessageProcessor + " / " + applicationRemovedMessageProcessor);
-        }
-
-        if (log.isDebugEnabled()) {
             log.debug("Topology message processor chain initialized X1");
         }
     }
@@ -153,9 +148,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
             applicationCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationRemovedEventListener) {
             applicationRemovedMessageProcessor.addEventListener(eventListener);
-        	if (log.isDebugEnabled()) {
-                log.debug("Grouping: added eventlistener to applicationCreatedMessageProcessor: " + eventListener);
-            }
         } else if (eventListener instanceof ApplicationActivatedEventListener) {
             applicationActivatedMessageProcessor.addEventListener(eventListener);
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 9cc8f78..218c441 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -69,15 +69,15 @@ class TopologyEventMessageDelegator implements Runnable {
                         log.debug(String.format("Topology event message [%s] received from queue: %s", type, messageQueue.getClass()));
                     }
 
-                    try {
-                        TopologyManager.acquireWriteLock();
+//                    try {
+//                        TopologyManager.acquireWriteLock();
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Delegating topology event message: %s", type));
                         }
                         processorChain.process(type, json, TopologyManager.getTopology());
-                    } finally {
-                        TopologyManager.releaseWriteLock();
-                    }
+//                    } finally {
+//                        TopologyManager.releaseWriteLock();
+//                    }
 
                 } catch (Exception e) {
                     log.error("Failed to retrieve topology event message", e);

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
index 5df66bd..2ffd7f6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyManager.java
@@ -21,7 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -30,43 +36,454 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  *  Usage:
  *  Acquire a relevant lock and invoke the getTopology() method inside a try block.
  *  Once processing is done release the lock using a finally block.
+ *
+ *  Acquiring Locks:
+ *
+ *  Stratos supports hierarchical locking. As per the practice, we need to lock the
+ *  hierarchy from root level till the relevant sub tree.
+ *
+ *  Acquire a write lock:
+ *
+ *  From root level, acquire read lock, and acquire a write lock only for the
+ *  relevant sub tree.
+ *
+ *  Acquire a read lock:
+ *
+ *  From root level, acquire read locks till the relevant sub tree
+ *
+ *  Examples -
+ *
+ *  Example 1: Acquiring write lock for a Cluster to modify the Cluster object -
+ *           acquiring:
+ *           1. acquire read lock for all Services
+ *           2. acquire read lock for the particular Service, to which the cluster belongs to
+ *           3. acquire write lock for the Cluster
+ *
+ *           releasing:
+ *           1. release write lock for the Cluster
+ *           2. release read lock for the particular Service
+ *           3. release read lock for all Services
+ *
+ *  Example 2: Acquiring write lock to add a new Cluster object -
+ *           acquiring:
+ *           1. acquire read lock for all Services
+ *           2. acquire write lock for the particular Service, to which the cluster belongs to
+ *
+ *           releasing:
+ *           1. release write lock for the particular Service
+ *           2. release read lock for all Services
+ *
+ *  Example 3: Acquiring read lock to read Cluster information
+ *           acquiring:
+ *           1. acquire read lock for all Services
+ *           2. acquire read lock for the particular Service, to which the cluster belongs to
+ *           3. acquire read lock for the relevant Cluster
+ *
+ *           releasing:
+ *           1. release read lock for the relevant Cluster
+ *           2. release read lock for the particular Service
+ *           3. release read lock for all Services
+ *
+ *  Example 4: Acquiring the write lock to add a deploy a Cartridge (add a new Service)
+ *           acquire:
+ *           1. acquire write lock for all Services
+ *
+ *           release:
+ *           1. release write lock for all Services
  */
 public class TopologyManager {
     private static final Log log = LogFactory.getLog(TopologyManager.class);
 
     private static volatile Topology topology;
+    private static final TopologyLockHierarchy topologyLockHierarchy =
+            TopologyLockHierarchy.getInstance();
     private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
     private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
+
+    // Top level locks - should be used to lock the entire Topology
+
     public static void acquireReadLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Read lock acquired");
+            log.debug("Read lock acquired for Topology");
         }
         readLock.lock();
     }
 
     public static void releaseReadLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Read lock released");
+            log.debug("Read lock released for Topology");
         }
         readLock.unlock();
     }
 
     public static void acquireWriteLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Write lock acquired");
+            log.debug("Write lock acquired for Topology");
         }
         writeLock.lock();
     }
 
     public static void releaseWriteLock() {
         if(log.isDebugEnabled()) {
-            log.debug("Write lock released");
+            log.debug("Write lock released for Topology");
         }
         writeLock.unlock();
     }
 
+    // Application, Service and Cluster read locks
+
+    public static void acquireReadLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock acquired for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().acquireReadLock();
+    }
+
+    public static void releaseReadLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock released for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().releaseReadLock();
+    }
+
+    public static void acquireReadLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock acquired for Services");
+        }
+        topologyLockHierarchy.getServiceLock().acquireReadLock();
+    }
+
+    public static void releaseReadLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock released for Services");
+        }
+        topologyLockHierarchy.getServiceLock().releaseReadLock();
+    }
+
+    // Application, Service and Cluster write locks
+
+    public static void acquireWriteLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock acquired for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().acquireWriteLock();
+    }
+
+    public static void releaseWriteLockForApplications() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock released for Applications");
+        }
+        topologyLockHierarchy.getApplicatioLock().releaseWritelock();
+    }
+
+    public static void acquireWriteLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock acquired for Services");
+        }
+        topologyLockHierarchy.getServiceLock().acquireWriteLock();
+    }
+
+    public static void releaseWriteLockForServices() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock released for Services");
+        }
+        topologyLockHierarchy.getServiceLock().releaseWritelock();
+    }
+
+    public static void acquireReadLockForService (String serviceName) {
+
+        // acquire read lock for all Services
+        acquireReadLockForServices();
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.acquireReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock acquired for Service " + serviceName);
+            }
+        }
+    }
+
+    public static void releaseReadLockForService (String serviceName) {
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.releaseReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock released for Service " + serviceName);
+            }
+        }
+
+        // release read lock for all Services
+        releaseReadLockForServices();
+    }
+
+    public static void acquireWriteLockForService (String serviceName) {
+
+        // acquire read lock for all Applications
+        acquireReadLockForServices();
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Service " + serviceName);
+            }
+        }
+    }
+
+    public static void releaseWriteLockForService (String serviceName) {
+
+        TopologyLock topologyServiceLock = topologyLockHierarchy.getTopologyLockForService(serviceName);
+        if (topologyServiceLock == null) {
+            handleLockNotFound("Topology lock not found for Service " + serviceName);
+
+        } else {
+            topologyServiceLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Service " + serviceName);
+            }
+        }
+
+        // release read lock for all Services
+        releaseReadLockForServices();
+    }
+
+    public static void acquireReadLockForCluster (String serviceName, String clusterId) {
+
+        // acquire read lock for the relevant Services
+        acquireReadLockForService(serviceName);
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        }  else {
+            // acquire read lock for the relevant Cluster
+            topologyClusterLock.acquireReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock acquired for Cluster " + clusterId);
+            }
+        }
+    }
+
+    public static void releaseReadLockForCluster (String serviceName, String clusterId) {
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        } else {
+            // release read lock for the relevant Cluster
+            topologyClusterLock.releaseReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock released for Cluster " + clusterId);
+            }
+        }
+
+        // release read lock for relevant Service
+        releaseReadLockForService(serviceName);
+    }
+
+    public static void acquireWriteLockForCluster (String serviceName, String clusterId) {
+
+        // acquire read lock for the relevant Services
+        acquireReadLockForService(serviceName);
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        } else {
+            topologyClusterLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Cluster " + clusterId);
+            }
+        }
+    }
+
+    public static void releaseWriteLockForCluster (String serviceName, String clusterId) {
+
+        TopologyLock topologyClusterLock = topologyLockHierarchy.getTopologyLockForCluster(clusterId);
+        if (topologyClusterLock == null) {
+            handleLockNotFound("Topology lock not found for Cluster " + clusterId);
+
+        } else {
+            topologyClusterLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Cluster " + clusterId);
+            }
+        }
+
+        // release read lock for relevant Service
+        releaseReadLockForService(serviceName);
+    }
+
+    public static void acquireReadLockForApplication (String appId) {
+
+        // acquire read lock for all Applications
+        acquireReadLockForApplications();
+
+        // get the Application's cluster's and acquire read locks
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    // acquire read locks for services and clusters
+                    acquireReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+
+            } else {
+               if (log.isDebugEnabled()) {
+                   log.debug("No Cluster Data found in Application " + appId);
+               }
+            }
+        }
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // now, lock Application
+            topologyAppLock.acquireReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock acquired for Application " + appId);
+            }
+        }
+    }
+
+    public static void releaseReadLockForApplication (String appId) {
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // release App lock
+            topologyAppLock.releaseReadLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Read lock released for Application " + appId);
+            }
+        }
+
+        // release read lock for all Applications
+        releaseReadLockForApplications();
+
+        // get the Application's cluster information
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    // release read locks for clusters and services
+                    releaseReadLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("No Cluster Data found in Application " + appId);
+                }
+            }
+        }
+    }
+
+    public static synchronized void acquireWriteLockForApplication (String appId) {
+
+        // acquire read lock for all Applications
+        acquireReadLockForApplications();
+
+        // get the Application's cluster's and acquire read locks
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    acquireWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("No Cluster Data found in Application " + appId);
+                }
+            }
+        }
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // now, lock Application
+            topologyAppLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Application " + appId);
+            }
+        }
+    }
+
+    public static synchronized void releaseWriteLockForApplication (String appId) {
+
+        TopologyLock topologyAppLock = topologyLockHierarchy.getTopologyLockForApplication(appId);
+        if (topologyAppLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // release App lock
+            topologyAppLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Application " + appId);
+            }
+        }
+
+        // release read lock for all Applications
+        releaseReadLockForApplications();
+
+        // get the Application's cluster's and acquire read
+        Application application = topology.getApplication(appId);
+        if (application == null) {
+            log.warn("Application " + appId + " is not found in the Topology");
+
+        } else {
+            Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
+            if (clusterData != null && !clusterData.isEmpty()) {
+                for (ClusterDataHolder clusterDataHolder : clusterData) {
+                    // release read locks for clusters and services
+                    releaseWriteLockForCluster(clusterDataHolder.getServiceType(), clusterDataHolder.getClusterId());
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("No Cluster Data found in Application " + appId);
+                }
+            }
+        }
+    }
+
+    private static void handleLockNotFound (String errorMsg) {
+        log.warn(errorMsg);
+        //throw new RuntimeException(errorMsg);
+    }
+
     public static Topology getTopology() {
         if (topology == null) {
             synchronized (TopologyManager.class){


Mime
View raw message