stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From isu...@apache.org
Subject [3/4] initial changes for hierarchical topology locking
Date Tue, 07 Oct 2014 13:40:32 GMT
http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index c007343..94b9650 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterCreatedMessageProcessor extends MessageProcessor {
@@ -41,83 +42,97 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
 
     @Override
     public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
 
+        Topology topology = (Topology) object;
         if (ClusterCreatedEvent.class.getName().equals(type)) {
             // Return if topology has not been initialized
-            if (!topology.isInitialized())
+            if (!topology.isInitialized()) {
                 return false;
+            }
 
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
-            }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Validate event properties
-            Cluster cluster = event.getCluster();
-            if(cluster == null) {
-                String msg = "Cluster object of cluster created event is null.";
-                log.error(msg);
-                throw new RuntimeException(msg);
-            }
-            if (cluster.getHostNames().isEmpty()) {
-                throw new RuntimeException("Host name/s not found in cluster created event");
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        }
+    }
+
+    private boolean doProcess (ClusterCreatedEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            if (service.clusterExists(event.getClusterId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
-                }
-			} else {
-
-				// Apply changes to the topology
-				service.addCluster(cluster);
-				if (log.isInfoEnabled()) {
-					log.info(String.format("Cluster created: %s",
-							cluster.toString()));
-				}
-			}
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                }
+                return false;
+            }
+        }
 
+        // Validate event properties
+        Cluster cluster = event.getCluster();
+        if(cluster == null) {
+            String msg = "Cluster object of cluster created event is null.";
+            log.error(msg);
+            throw new RuntimeException(msg);
+        }
+        if (cluster.getHostNames().isEmpty()) {
+            throw new RuntimeException("Host name/s not found in cluster created event");
+        }
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        if (service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+            // Apply changes to the topology
+            service.addCluster(cluster);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster created: %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
index f125c54..8629363 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterMaintenanceModeMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
@@ -49,64 +50,77 @@ public class ClusterMaintenanceModeMessageProcessor extends MessageProcessor {
             ClusterMaintenanceModeEvent event = (ClusterMaintenanceModeEvent) Util.
                                 jsonToObject(message, ClusterMaintenanceModeEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterMaintenanceModeEvent event,Topology topology)  {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
+        }
 
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                            event.getClusterId()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
-			} else {
-			    // Apply changes to the topology
-                cluster.setStatus(Status.In_Maintenance);
-				if (log.isInfoEnabled()) {
-					log.info(String.format("Cluster updated as maintenance mode: %s",
-							cluster.toString()));
-				}
-			}
+                return false;
+            }
+        }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
 
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            // Apply changes to the topology
+            cluster.setStatus(Status.In_Maintenance);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster updated as maintenance mode: %s",
+                        cluster.toString()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 69ef5b0..1dfb929 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class ClusterRemovedMessageProcessor extends MessageProcessor {
@@ -50,65 +51,79 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             ClusterRemovedEvent event = (ClusterRemovedEvent) Util.jsonToObject(message, ClusterRemovedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireReadLockForServices();
+            TopologyManager.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForService(event.getServiceName());
+                TopologyManager.releaseReadLockForServices();
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (ClusterRemovedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
+        }
 
-            // Notify event listeners before removing the cluster object
-            notifyEventListeners(event);
-
-            if (!service.clusterExists(event.getClusterId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(),
-                            event.getClusterId()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
-            } else {
-            	
-            	// Apply changes to the topology
-            	service.removeCluster(event.getClusterId());
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
-            				event.getServiceName(), event.getClusterId()));
-            	}
+                return false;
+            }
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
             }
+            return false;
+        }
+
+        // Notify event listeners before removing the cluster object
+        notifyEventListeners(event);
 
-            return true;
+        if (!service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(),
+                        event.getClusterId()));
+            }
         } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+
+            // Apply changes to the topology
+            service.removeCluster(event.getClusterId());
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster removed from service: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
             }
         }
+
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 135bdae..6d5cb8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 import java.util.ArrayList;
@@ -49,102 +50,20 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
         if (CompleteTopologyEvent.class.getName().equals(type)) {
         	// Parse complete message and build event
         	CompleteTopologyEvent event = (CompleteTopologyEvent) Util.jsonToObject(message, CompleteTopologyEvent.class);
-        	
-            // if topology has not already initialized
-			if (!topology.isInitialized()) {
-
-				// Apply service filter
-				if (TopologyServiceFilter.getInstance().isActive()) {
-					// Add services included in service filter
-					for (Service service : event.getTopology().getServices()) {
-						if (TopologyServiceFilter.getInstance()
-								.serviceNameIncluded(service.getServiceName())) {
-							topology.addService(service);
-						} else {
-							if (log.isDebugEnabled()) {
-								log.debug(String.format(
-										"Service is excluded: [service] %s",
-										service.getServiceName()));
-							}
-						}
-					}
-				} else {
-					// Add all services
-					topology.addServices(event.getTopology().getServices());
-				}
-
-				// Apply cluster filter
-				if (TopologyClusterFilter.getInstance().isActive()) {
-					for (Service service : topology.getServices()) {
-						List<Cluster> clustersToRemove = new ArrayList<Cluster>();
-						for (Cluster cluster : service.getClusters()) {
-							if (TopologyClusterFilter.getInstance()
-									.clusterIdExcluded(cluster.getClusterId())) {
-								clustersToRemove.add(cluster);
-							}
-						}
-						for (Cluster cluster : clustersToRemove) {
-							service.removeCluster(cluster);
-							if (log.isDebugEnabled()) {
-								log.debug(String.format(
-										"Cluster is excluded: [cluster] %s",
-										cluster.getClusterId()));
-							}
-						}
-					}
-				}
-
-				// Apply member filter
-				if (TopologyMemberFilter.getInstance().isActive()) {
-					for (Service service : topology.getServices()) {
-						for (Cluster cluster : service.getClusters()) {
-							List<Member> membersToRemove = new ArrayList<Member>();
-							for (Member member : cluster.getMembers()) {
-								if (TopologyMemberFilter.getInstance()
-										.lbClusterIdExcluded(
-												member.getLbClusterId())) {
-									membersToRemove.add(member);
-								}
-							}
-							for (Member member : membersToRemove) {
-								cluster.removeMember(member);
-								if (log.isDebugEnabled()) {
-									log.debug(String
-											.format("Member is excluded: [member] %s [lb-cluster-id] %s",
-													member.getMemberId(),
-													member.getLbClusterId()));
-								}
-							}
-						}
-					}
-				}
-
-                // add existing Applications to Topology
-                Collection<Application> applications = event.getTopology().getApplications();
-                if (applications != null && !applications.isEmpty()) {
-                    for (Application application : applications) {
-                        topology.addApplication(application);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Application with id [ " +  application.getId() + " ] added to Topology");
-                        }
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("No Application information found in Complete Topology event");
-                    }
-                }
 
-				if (log.isInfoEnabled()) {
-					log.info("Topology initialized");
-				}
+            if (!topology.isInitialized()) {
+                TopologyManager.acquireWriteLock();
+
+                try {
+                    return doProcess(event, topology);
 
-				// Set topology initialized
-				topology.setInitialized(true);
-			}
+                } finally {
+                    TopologyManager.releaseWriteLock();
+                }
+            } else {
+                return true;
+            }
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
@@ -153,4 +72,99 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
             return false;
         }
     }
+
+    private boolean doProcess (CompleteTopologyEvent event, Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            // Add services included in service filter
+            for (Service service : event.getTopology().getServices()) {
+                if (TopologyServiceFilter.getInstance()
+                        .serviceNameIncluded(service.getServiceName())) {
+                    topology.addService(service);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format(
+                                "Service is excluded: [service] %s",
+                                service.getServiceName()));
+                    }
+                }
+            }
+        } else {
+            // Add all services
+            topology.addServices(event.getTopology().getServices());
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            for (Service service : topology.getServices()) {
+                List<Cluster> clustersToRemove = new ArrayList<Cluster>();
+                for (Cluster cluster : service.getClusters()) {
+                    if (TopologyClusterFilter.getInstance()
+                            .clusterIdExcluded(cluster.getClusterId())) {
+                        clustersToRemove.add(cluster);
+                    }
+                }
+                for (Cluster cluster : clustersToRemove) {
+                    service.removeCluster(cluster);
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format(
+                                "Cluster is excluded: [cluster] %s",
+                                cluster.getClusterId()));
+                    }
+                }
+            }
+        }
+
+        // Apply member filter
+        if (TopologyMemberFilter.getInstance().isActive()) {
+            for (Service service : topology.getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    List<Member> membersToRemove = new ArrayList<Member>();
+                    for (Member member : cluster.getMembers()) {
+                        if (TopologyMemberFilter.getInstance()
+                                .lbClusterIdExcluded(
+                                        member.getLbClusterId())) {
+                            membersToRemove.add(member);
+                        }
+                    }
+                    for (Member member : membersToRemove) {
+                        cluster.removeMember(member);
+                        if (log.isDebugEnabled()) {
+                            log.debug(String
+                                    .format("Member is excluded: [member] %s [lb-cluster-id] %s",
+                                            member.getMemberId(),
+                                            member.getLbClusterId()));
+                        }
+                    }
+                }
+            }
+        }
+
+        // add existing Applications to Topology
+        Collection<Application> applications = event.getTopology().getApplications();
+        if (applications != null && !applications.isEmpty()) {
+            for (Application application : applications) {
+                topology.addApplication(application);
+                if (log.isDebugEnabled()) {
+                    log.debug("Application with id [ " +  application.getId() + " ] added to Topology");
+                }
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("No Application information found in Complete Topology event");
+            }
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Topology initialized");
+        }
+
+        // Set topology initialized
+        topology.setInitialized(true);
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
index 627d9a9..7200431 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
@@ -21,11 +21,9 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent;
 import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
-import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 /**
@@ -53,34 +51,16 @@ public class GroupActivatedProcessor extends MessageProcessor {
             GroupActivatedEvent event = (GroupActivatedEvent) Util.
                     jsonToObject(message, GroupActivatedEvent.class);
 
-            // Validate event against the existing topology
-            Application application = topology.getApplication(event.getAppId());
-            if (application == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Application does not exist: [service] %s",
-                            event.getAppId()));
-                }
-                return false;
-            }
-            Group group = application.getGroup(event.getGroupId());
+            TopologyManager.acquireReadLockForApplications();
+            TopologyManager.acquireWriteLockForApplication(event.getAppId());
 
-            if (group == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                            event.getGroupId()));
-                }
-            } else {
-                // Apply changes to the topology
-                group.setStatus(Status.Activated);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Group updated as activated : %s",
-                            group.toString()));
-                }
-            }
+            try {
+                return doProcess(event, topology);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
+            } finally {
+                TopologyManager.releaseWriteLockForApplication(event.getAppId());
+                TopologyManager.releaseReadLockForApplications();
+            }
 
         } else {
             if (nextProcessor != null) {
@@ -91,4 +71,36 @@ public class GroupActivatedProcessor extends MessageProcessor {
             }
         }
     }
+
+    private boolean doProcess (GroupActivatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroup(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            group.setStatus(Status.Activated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Group updated as activated : %s",
+                        group.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
index 8e4e1b1..2d3f8b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/InstanceSpawnedMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class InstanceSpawnedMessageProcessor extends MessageProcessor {
@@ -50,92 +51,103 @@ public class InstanceSpawnedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             InstanceSpawnedEvent event = (InstanceSpawnedEvent) Util.jsonToObject(message, InstanceSpawnedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (InstanceSpawnedEvent event,Topology topology){
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
-                    }
-                    return false;
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
+                return false;
             }
+        }
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(event.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", event.getLbClusterId()));
                 }
                 return false;
             }
-            if (cluster.memberExists(event.getMemberId())) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
-            	member.setStatus(MemberStatus.Created);
-            	member.setMemberPublicIp(event.getMemberPublicIp());
-            	member.setMemberIp(event.getMemberIp());
-            	member.setLbClusterId(event.getLbClusterId());
-                member.setProperties(event.getProperties());
-            	cluster.addMember(member);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        if (cluster.memberExists(event.getMemberId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already exists: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            Member member = new Member(event.getServiceName(), event.getClusterId(), event.getNetworkPartitionId(), event.getPartitionId(), event.getMemberId());
+            member.setStatus(MemberStatus.Created);
+            member.setMemberPublicIp(event.getMemberPublicIp());
+            member.setMemberIp(event.getMemberIp());
+            member.setLbClusterId(event.getLbClusterId());
+            member.setProperties(event.getProperties());
+            cluster.addMember(member);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member created: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index a5d701d..ec1b5ec 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberActivatedMessageProcessor extends MessageProcessor {
@@ -54,111 +55,123 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(message, MemberActivatedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (MemberActivatedEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
+                return false;
             }
+        }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
+                return false;
             }
+        }
 
-            // Validate event properties
-            if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
-                throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
-                        event.getServiceName(),
-                        event.getClusterId(),
-                        event.getMemberId()));
+        // Validate event properties
+        if ((event.getMemberIp() == null) || event.getMemberIp().isEmpty()) {
+            throw new RuntimeException(String.format("No ip address found in member activated event: [service] %s [cluster] %s [member] %s",
+                    event.getServiceName(),
+                    event.getClusterId(),
+                    event.getMemberId()));
+        }
+        if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
+            throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+                    event.getServiceName(),
+                    event.getClusterId(),
+                    event.getMemberId()));
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
             }
-            if ((event.getPorts() == null) || (event.getPorts().size() == 0)) {
-                throw new RuntimeException(String.format("No ports found in member activated event: [service] %s [cluster] %s [member] %s",
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
                         event.getServiceName(),
                         event.getClusterId(),
                         event.getMemberId()));
             }
+            return false;
+        }
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s", event.getServiceName()));
-                }
-                return false;
-            }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
-                }
-                return false;
-            }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
                 return false;
             }
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
-                }
+        if (member.getStatus() == MemberStatus.Activated) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
-            if (member.getStatus() == MemberStatus.Activated) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already activated: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.addPorts(event.getPorts());
-            	member.setMemberIp(event.getMemberIp());
-            	member.setStatus(MemberStatus.Activated);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
-            }
+            // Apply changes to the topology
+            member.addPorts(event.getPorts());
+            member.setMemberIp(event.getMemberIp());
+            member.setStatus(MemberStatus.Activated);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member activated: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
index b6dc489..b252a61 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -27,6 +27,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberMaintenanceModeProcessor extends MessageProcessor {
@@ -51,98 +52,110 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor {
             MemberMaintenanceModeEvent event = (MemberMaintenanceModeEvent) Util.
                                             jsonToObject(message, MemberMaintenanceModeEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberMaintenanceModeEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.In_Maintenance) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already updated as In_Maintenance: " +
-                            "[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.In_Maintenance);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.In_Maintenance) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already updated as In_Maintenance: " +
+                        "[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.In_Maintenance);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as In_Maintenance: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
index 92115aa..f0c3580 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
@@ -50,98 +51,111 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor{
             MemberReadyToShutdownEvent event = (MemberReadyToShutdownEvent) Util.
                                             jsonToObject(message, MemberReadyToShutdownEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberReadyToShutdownEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.ReadyToShutDown) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already updated as Ready to Shutdown: " +
-                            "[service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.ReadyToShutDown);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.ReadyToShutDown) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already updated as Ready to Shutdown: " +
+                        "[service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.ReadyToShutDown);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member updated as Ready to shutdown: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4ace39c8/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index 4d93957..508ec39 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Util;
 
 public class MemberStartedMessageProcessor extends MessageProcessor {
@@ -54,97 +55,109 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
             // Parse complete message and build event
             MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(message, MemberStartedEvent.class);
 
-            // Apply service filter
-            if (TopologyServiceFilter.getInstance().isActive()) {
-                if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
-                    // Service is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
-                    }
-                    return false;
-                }
+            TopologyManager.acquireWriteLockForCluster(event.getServiceName(), event.getClusterId());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyManager.releaseWriteLockForCluster(event.getServiceName(), event.getClusterId());
             }
 
-            // Apply cluster filter
-            if (TopologyClusterFilter.getInstance().isActive()) {
-                if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
-                    // Cluster is excluded, do not update topology or fire event
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
-                    }
-                    return false;
-                }
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
+        }
+    }
+
+    private boolean doProcess (MemberStartedEvent event,Topology topology) {
 
-            // Validate event against the existing topology
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Service does not exist: [service] %s",
-                            event.getServiceName()));
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
                 }
                 return false;
             }
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
-                            event.getServiceName(), event.getClusterId()));
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
                 }
                 return false;
             }
-            Member member = cluster.getMember(event.getMemberId());
-            if (member == null) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-                return false;
+        }
+
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
             }
+            return false;
+        }
+        Cluster cluster = service.getCluster(event.getClusterId());
+        if (cluster == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s",
+                        event.getServiceName(), event.getClusterId()));
+            }
+            return false;
+        }
+        Member member = cluster.getMember(event.getMemberId());
+        if (member == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member does not exist: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
+            }
+            return false;
+        }
 
-            // Apply member filter
-            if(TopologyMemberFilter.getInstance().isActive()) {
-                if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
-                    }
-                    return false;
+        // Apply member filter
+        if(TopologyMemberFilter.getInstance().isActive()) {
+            if(TopologyMemberFilter.getInstance().lbClusterIdExcluded(member.getLbClusterId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is excluded: [lb-cluster-id] %s", member.getLbClusterId()));
                 }
+                return false;
             }
+        }
 
-            if (member.getStatus() == MemberStatus.Starting) {
-                if (log.isWarnEnabled()) {
-                    log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
-                            event.getServiceName(),
-                            event.getClusterId(),
-                            event.getMemberId()));
-                }
-            } else {
-            	
-            	// Apply changes to the topology
-            	member.setStatus(MemberStatus.Starting);
-            	
-            	if (log.isInfoEnabled()) {
-            		log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
-            				event.getServiceName(),
-            				event.getClusterId(),
-            				event.getMemberId()));
-            	}
+        if (member.getStatus() == MemberStatus.Starting) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Member already started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
+        } else {
 
+            // Apply changes to the topology
+            member.setStatus(MemberStatus.Starting);
 
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member started: [service] %s [cluster] %s [member] %s",
+                        event.getServiceName(),
+                        event.getClusterId(),
+                        event.getMemberId()));
             }
         }
+
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
     }
 }


Mime
View raw message