stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/2] git commit: adding complete applications
Date Fri, 31 Oct 2014 09:24:49 GMT
Repository: stratos
Updated Branches:
  refs/heads/4.0.0-grouping 7f05298c3 -> 696bc0d24


adding complete applications


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0f7d7b0d
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0f7d7b0d
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0f7d7b0d

Branch: refs/heads/4.0.0-grouping
Commit: 0f7d7b0d0db9f643fd232f0ef2689cf62553dee7
Parents: 7f05298
Author: reka <rthirunavukkarasu23@gmail.com>
Authored: Fri Oct 31 14:53:49 2014 +0530
Committer: reka <rthirunavukkarasu23@gmail.com>
Committed: Fri Oct 31 14:54:33 2014 +0530

----------------------------------------------------------------------
 .../domain/applications/Applications.java       |  22 ++-
 .../locking/ApplicationLockHierarchy.java       |   3 +
 .../applications/ApplicationCreatedEvent.java   |  11 +-
 .../applications/CompleteApplicationsEvent.java |  44 ++++++
 .../ClusterStatusClusterCreatedEvent.java       |  52 -------
 .../ClusterStatusClusterResettedEvent.java      |  52 +++++++
 .../event/topology/ClusterCreatedEvent.java     |  26 +---
 .../event/topology/ClusterResetEvent.java       |  56 +++++++
 .../CompleteApplicationsEventListener.java      |  26 ++++
 ...lusterStatusClusterCreatedEventListener.java |  24 ---
 .../ClusterStatusClusterResetEventListener.java |  24 +++
 .../ApplicationCreatedMessageProcessor.java     |  32 +---
 .../CompleteApplicationsMessageProcessor.java   | 111 ++++++++++++++
 .../updater/ApplicationsUpdater.java            | 140 ++++++++++++++++++
 ...terStatusClusterCreatedMessageProcessor.java |  58 --------
 ...usterStatusClusterResetMessageProcessor.java |  58 ++++++++
 .../ClusterStatusMessageProcessorChain.java     |   6 +-
 .../ClusterCreatedMessageProcessor.java         |  35 +++--
 .../topology/ClusterResetMessageProcessor.java  | 146 +++++++++++++++++++
 19 files changed, 721 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
index 9455a4c..9e8cf3e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java
@@ -32,15 +32,33 @@ public class Applications implements Serializable {
 
     private Map<String, Application> applicationMap;
 
+    private boolean initialized;
+
     public Applications () {
         this.applicationMap = new HashMap<String, Application>();
     }
 
     public void addApplication (Application application) {
-        this.applicationMap.put(application.getUniqueIdentifier(), application);
+        this.getApplications().put(application.getUniqueIdentifier(), application);
     }
 
     public Application getApplication (String appId) {
-        return this.applicationMap.get(appId);
+        return this.getApplications().get(appId);
+    }
+
+    public boolean isInitialized() {
+        return initialized;
+    }
+
+    public void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+
+    public boolean applicationExists(String appId) {
+        return this.getApplications().containsKey(appId);
+    }
+
+    public Map<String, Application> getApplications() {
+        return applicationMap;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
index 71bfe03..cc31892 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java
@@ -85,4 +85,7 @@ public class ApplicationLockHierarchy {
         }
     }
 
+    public ApplicationLock getApplicationLock() {
+        return applicationLock;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java
index dfb90f2..26ba7f2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java
@@ -18,6 +18,7 @@
  */
 package org.apache.stratos.messaging.event.applications;
 
+import org.apache.stratos.messaging.domain.applications.Application;
 import org.apache.stratos.messaging.event.Event;
 
 import java.io.Serializable;
@@ -28,13 +29,13 @@ import java.io.Serializable;
 public class ApplicationCreatedEvent extends Event implements Serializable {
     private static final long serialVersionUID = 2625412714611885089L;
 
-    private String appId;
+    private Application application;
 
-    public ApplicationCreatedEvent(String appId) {
-        this.appId = appId;
+    public ApplicationCreatedEvent(Application application) {
+        this.application = application;
     }
 
-    public String getAppId() {
-        return appId;
+    public Application getApplication() {
+        return application;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java
new file mode 100644
index 0000000..25035ab
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.applications;
+
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.event.topology.TopologyEvent;
+
+import java.io.Serializable;
+
+/**
+ *  This event is fired periodically with the complete topology. It would be a
+ *  starting point for subscribers to initialize the current state of the topology
+ *  before receiving other topology events.
+ */
+public class CompleteApplicationsEvent extends TopologyEvent implements Serializable {
+    private static final long serialVersionUID = 8580862188444892004L;
+
+    private final Applications applications;
+
+    public CompleteApplicationsEvent(Applications applications) {
+        this.applications = applications;
+    }
+
+    public Applications getApplications() {
+        return applications;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java
deleted file mode 100644
index d28cca1..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.event.cluster.status;
-
-import org.apache.stratos.messaging.event.Event;
-
-/**
- * This event is fired by cartridge agent when it has started the server and
- * applications are ready to serve the incoming requests.
- */
-public class ClusterStatusClusterCreatedEvent extends Event {
-    private static final long serialVersionUID = 2625412714611885089L;
-
-    private final String serviceName;
-    private final String clusterId;
-    private String appId;
-
-    public ClusterStatusClusterCreatedEvent(String appId, String serviceName, String clusterId) {
-        this.serviceName = serviceName;
-        this.clusterId = clusterId;
-        this.appId = appId;
-    }
-
-    public String getServiceName() {
-        return serviceName;
-    }
-
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public String getAppId() {
-        return appId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java
new file mode 100644
index 0000000..68c8e7f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.cluster.status;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class ClusterStatusClusterResettedEvent extends Event {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private final String serviceName;
+    private final String clusterId;
+    private String appId;
+
+    public ClusterStatusClusterResettedEvent(String appId, String serviceName, String clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.appId = appId;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java
index 70452ab..4c06f3b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java
@@ -28,31 +28,19 @@ import java.io.Serializable;
 public class ClusterCreatedEvent extends TopologyEvent implements Serializable {
     private static final long serialVersionUID = 2080623816272047762L;
 
-    private final String appId;
-    private final String serviceName;
-	private final String clusterId;
+    private final Cluster cluster;
 
-
-    public ClusterCreatedEvent(String appId, String serviceName, String clusterId) {
-        this.appId = appId;
-        this.serviceName = serviceName;
-        this.clusterId = clusterId;
+    public ClusterCreatedEvent(Cluster cluster) {
+        this.cluster = cluster;
     }
 
-    public String getServiceName() {
-        return serviceName;
-    }
-    
     @Override
     public String toString() {
-        return "ClusterCreatedEvent [serviceName=" + serviceName + ", application=" + appId + "]";
-    }
-
-    public String getClusterId() {
-        return clusterId;
+        return "ClusterCreatedEvent [serviceName=" + cluster.getServiceName() + ", " +
+                "application=" + cluster.getAppId() + " , cluster= " + cluster.getClusterId() + " ]";
     }
 
-    public String getAppId() {
-        return appId;
+    public Cluster getCluster() {
+        return cluster;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java
new file mode 100644
index 0000000..d4d6622
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Cluster activated event will be sent by Autoscaler
+ */
+public class ClusterResetEvent extends Event {
+
+    private final String serviceName;
+    private final String clusterId;
+    private String appId;
+
+    public ClusterResetEvent(String appId, String serviceName, String clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.appId = appId;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterActivatedEvent [serviceName=" + serviceName + ", clusterStatus=" +
+                "]";
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java
new file mode 100644
index 0000000..cd1bb24
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.applications;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class CompleteApplicationsEventListener extends EventListener {
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java
deleted file mode 100644
index 6ca5476..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.listener.cluster.status;
-
-import org.apache.stratos.messaging.listener.EventListener;
-
-public abstract class ClusterStatusClusterCreatedEventListener extends EventListener{
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java
new file mode 100644
index 0000000..375ae32
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.cluster.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class ClusterStatusClusterResetEventListener extends EventListener{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
index 92d61ba..db8e3c8 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java
@@ -21,10 +21,10 @@ package org.apache.stratos.messaging.message.processor.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.applications.Applications;
 import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
 import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
 import org.apache.stratos.messaging.util.Util;
@@ -44,10 +44,10 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
     @Override
     public boolean process(String type, String message, Object object) {
 
-        Topology topology = (Topology) object;
+        Applications applications = (Applications) object;
 
         if (ApplicationCreatedEvent.class.getName().equals(type)) {
-            if (!topology.isInitialized()) {
+            if (!applications.isInitialized()) {
                 return false;
             }
 
@@ -58,37 +58,24 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
             }
 
             TopologyUpdater.acquireWriteLockForApplications();
-            // since the Clusters will also get modified, acquire write locks for each Service Type
-            Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively();
-            if (clusterDataHolders != null) {
-                for (ClusterDataHolder clusterData : clusterDataHolders) {
-                    TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
-                }
-            }
-
             try {
-                return doProcess(event, topology);
+                return doProcess(event, applications);
 
             } finally {
-                if (clusterDataHolders != null) {
-                    for (ClusterDataHolder clusterData : clusterDataHolders) {
-                        TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
-                    }
-                }
                 TopologyUpdater.releaseWriteLockForApplications();
             }
 
         } else {
             if (nextProcessor != null) {
                 // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
+                return nextProcessor.process(type, message, applications);
             } else {
                 throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
             }
         }
     }
 
-    private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
+    private boolean doProcess(ApplicationCreatedEvent event, Applications topology) {
 
         // check if required properties are available
         if (event.getApplication() == null) {
@@ -109,9 +96,6 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor {
 
         } else {
             // add application and the clusters to Topology
-            for(Cluster cluster: event.getClusterList()) {
-                topology.getService(cluster.getServiceName()).addCluster(cluster);
-            }
             topology.addApplication(event.getApplication());
         }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
new file mode 100644
index 0000000..53c469b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.Applications;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+import org.apache.stratos.messaging.event.applications.CompleteApplicationsEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class CompleteApplicationsMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(CompleteApplicationsMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Applications applications = (Applications) object;
+
+        if (CompleteApplicationsEvent.class.getName().equals(type)) {
+        	// Parse complete message and build event
+            CompleteApplicationsEvent event = (CompleteApplicationsEvent) Util.
+                    jsonToObject(message, CompleteApplicationsEvent.class);
+
+            if (!applications.isInitialized()) {
+                ApplicationsUpdater.acquireWriteLock();
+
+                try {
+                    doProcess(event, applications);
+
+                } finally {
+                    ApplicationsUpdater.releaseWriteLock();
+                }
+            }
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, applications);
+            }
+            return false;
+        }
+    }
+
+    private void doProcess (CompleteApplicationsEvent event, Applications applications) {
+    // add existing Applications to Topology
+        Collection<Application> applicationsList = event.getApplications().getApplications().values();
+        if (applicationsList != null && !applicationsList.isEmpty()) {
+            for (Application application : applicationsList) {
+                applications.addApplication(application);
+                if (log.isDebugEnabled()) {
+                    log.debug("Application with id [ " +  application.getUniqueIdentifier() + " ] added to Topology");
+                }
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("No Application information found in Complete Topology event");
+            }
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Topology initialized");
+        }
+
+        // Set topology initialized
+        applications.setInitialized(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java
new file mode 100644
index 0000000..bbbfbf5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.applications.updater;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.applications.locking.ApplicationLock;
+import org.apache.stratos.messaging.domain.applications.locking.ApplicationLockHierarchy;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLock;
+import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Used to lock the Topology for writes by messaging component
+ *
+ *  Acquire a write lock:
+ *
+ *  From root level, acquire read lock, and acquire a write lock only for the
+ *  relevant sub tree.
+ *
+ *  Example 1: Acquiring write lock for a Cluster to modify the Cluster object -
+ *           acquiring:
+ *           public static void acquireWriteLockForCluster (String serviceName, String clusterId)
+ *
+ *           releasing:
+ *           public static void releaseWriteLockForCluster (String serviceName, String clusterId)
+ *
+ *  Example 2: Acquiring write lock to add a new Cluster object -
+ *           acquiring:
+ *           public static void acquireWriteLockForService (String serviceName)
+ *
+ *           releasing:
+ *           public static void releaseWriteLockForService (String serviceName)
+ *
+ *  Example 3: Acquiring the write lock to add a deploy a Cartridge (add a new Service)
+ *           acquire:
+ *           public static void acquireWriteLockForServices()
+ *
+ *           release:
+ *           public static void releaseWriteLockForServices()
+ */
+
+public class ApplicationsUpdater {
+
+    private static final Log log = LogFactory.getLog(ApplicationsUpdater.class);
+
+    private static volatile ApplicationLockHierarchy applicationLockHierarchy =
+            ApplicationLockHierarchy.getInstance();
+
+    // Top level locks - should be used to lock the entire Topology
+
+    /**
+     * Acquires write lock for the Complete Topology
+     */
+    public static void acquireWriteLock() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock acquired for Topology");
+        }
+        applicationLockHierarchy.getApplicationLock().acquireWriteLock();
+    }
+
+    /**
+     * Releases write lock for the Complete Topology
+     */
+    public static void releaseWriteLock() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock released for Topology");
+        }
+        applicationLockHierarchy.getApplicationLock().releaseWritelock();
+    }
+
+    /**
+     * Acquires write lock for the Application
+     *
+     * @param appId Application id
+     */
+    public static void acquireWriteLockForApplication (String appId) {
+
+        // acquire read lock for all Applications
+        TopologyManager.acquireReadLockForApplications();
+
+        ApplicationLock applicationLock = applicationLockHierarchy.getLock(appId);
+        if (applicationLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // now, lock Application
+            applicationLock.acquireWriteLock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock acquired for Application " + appId);
+            }
+        }
+    }
+
+    /**
+     * Releases write lock for the Application
+     *
+     * @param appId Application id
+     */
+    public static void releaseWriteLockForApplication (String appId) {
+
+        ApplicationLock applicationLock = applicationLockHierarchy.getLock(appId);
+        if (applicationLock == null)  {
+            handleLockNotFound("Topology lock not found for Application " + appId);
+
+        } else {
+            // release App lock
+            applicationLock.releaseWritelock();
+            if(log.isDebugEnabled()) {
+                log.debug("Write lock released for Application " + appId);
+            }
+        }
+
+        // release read lock for all Applications
+        TopologyManager.releaseReadLockForApplications();
+    }
+
+    private static void handleLockNotFound (String errorMsg) {
+        log.warn(errorMsg);
+        //throw new RuntimeException(errorMsg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
deleted file mode 100644
index 9b4780b..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.cluster.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.util.Util;
-
-
-public class ClusterStatusClusterCreatedMessageProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(ClusterStatusClusterCreatedMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        if (ClusterStatusClusterCreatedEvent.class.getName().equals(type)) {
-            // Parse complete message and build event
-            ClusterStatusClusterCreatedEvent event = (ClusterStatusClusterCreatedEvent) Util.
-                    jsonToObject(message, ClusterStatusClusterCreatedEvent.class);
-
-            if(log.isDebugEnabled()) {
-                log.debug("Received AppStatusClusterCreatedEvent: " + event.toString());
-            }
-            // Notify event listeners
-            notifyEventListeners(event);
-            return true;
-        } else {
-            if (nextProcessor != null) {
-                return nextProcessor.process(type, message, object);
-            } else {
-                throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java
new file mode 100644
index 0000000..b5bf301
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterResettedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterResetMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(ClusterStatusClusterResetMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterStatusClusterResettedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterStatusClusterResettedEvent event = (ClusterStatusClusterResettedEvent) Util.
+                    jsonToObject(message, ClusterStatusClusterResettedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received AppStatusClusterCreatedEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
index 29556ec..42092bc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
@@ -32,13 +32,13 @@ public class ClusterStatusMessageProcessorChain extends MessageProcessorChain {
 
 
     private ClusterStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor;
-    private ClusterStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+    private ClusterStatusClusterResetMessageProcessor clusterCreatedMessageProcessor;
     private ClusterStatusClusterInactivateMessageProcessor clusterInactivateMessageProcessor;
     private ClusterStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor;
     private ClusterStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor;
     @Override
     protected void initialize() {
-        clusterCreatedMessageProcessor = new ClusterStatusClusterCreatedMessageProcessor();
+        clusterCreatedMessageProcessor = new ClusterStatusClusterResetMessageProcessor();
         add(clusterCreatedMessageProcessor);
 
         clusterActivatedMessageProcessor = new ClusterStatusClusterActivatedMessageProcessor();
@@ -60,7 +60,7 @@ public class ClusterStatusMessageProcessorChain extends MessageProcessorChain {
 
     @Override
     public void addEventListener(EventListener eventListener) {
-        if(eventListener instanceof ClusterStatusClusterCreatedEventListener) {
+        if(eventListener instanceof ClusterStatusClusterResetEventListener) {
             clusterCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ClusterStatusClusterInactivateEventListener) {
             clusterInactivateMessageProcessor.addEventListener(eventListener);

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index b8cd80f..0ff303c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -50,13 +50,13 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
 
             // Parse complete message and build event
             ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForService(event.getServiceName());
+            String serviceName = event.getCluster().getServiceName();
+            TopologyUpdater.acquireWriteLockForService(serviceName);
             try {
                 return doProcess(event, topology);
 
             } finally {
-                TopologyUpdater.releaseWriteLockForService(event.getServiceName());
+                TopologyUpdater.releaseWriteLockForService(serviceName);
             }
 
         } else {
@@ -70,13 +70,15 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
     }
 
     private boolean doProcess (ClusterCreatedEvent event,Topology topology) {
-
+        Cluster cluster = event.getCluster();
+        String serviceName = cluster.getServiceName();
+        String clusterId = cluster.getClusterId();
         // Apply service filter
         if (TopologyServiceFilter.getInstance().isActive()) {
-            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(serviceName)) {
                 // Service is excluded, do not update topology or fire event
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                    log.debug(String.format("Service is excluded: [service] %s", serviceName));
                 }
                 return false;
             }
@@ -84,10 +86,10 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
 
         // Apply cluster filter
         if (TopologyClusterFilter.getInstance().isActive()) {
-            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(clusterId)) {
                 // Cluster is excluded, do not update topology or fire event
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", clusterId));
                 }
                 return false;
             }
@@ -105,27 +107,24 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
             throw new RuntimeException("Host name/s not found in cluster created event");
         }*/
         // Validate event against the existing topology
-        Service service = topology.getService(event.getServiceName());
+        Service service = topology.getService(serviceName);
         if (service == null) {
             if (log.isWarnEnabled()) {
                 log.warn(String.format("Service does not exist: [service] %s",
-                        event.getServiceName()));
+                        serviceName));
             }
             return false;
         }
-        if (service.clusterExists(event.getClusterId())) {
+        if (service.clusterExists(clusterId)) {
             if (log.isWarnEnabled()) {
-                log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
-                        event.getClusterId()));
+                log.warn(String.format("Cluster already exists in service: [service] %s " +
+                                "[cluster] %s",serviceName ,
+                        clusterId));
             }
         } else {
 
             // Apply changes to the topology
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (!cluster.isStateTransitionValid(ClusterStatus.Created)) {
-                log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Created + " " +
-                        "for cluster " + cluster.getClusterId());
-            }
+            service.addCluster(cluster);
             cluster.setStatus(ClusterStatus.Created);
             if (log.isInfoEnabled()) {
                 log.info(String.format("Cluster created: %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
new file mode 100644
index 0000000..3cfb2dc
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterResetEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+public class ClusterResetMessageProcessor extends MessageProcessor {
+
+    private static final Log log = LogFactory.getLog(ClusterResetMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+
+        Topology topology = (Topology) object;
+        if (ClusterResetEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized()) {
+                return false;
+            }
+
+            // Parse complete message and build event
+            ClusterResetEvent event = (ClusterResetEvent) Util.
+                    jsonToObject(message, ClusterResetEvent.class);
+
+            TopologyUpdater.acquireWriteLockForService(event.getServiceName());
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForService(event.getServiceName());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ClusterResetEvent event,Topology topology) {
+
+        // Apply service filter
+        if (TopologyServiceFilter.getInstance().isActive()) {
+            if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) {
+                // Service is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Service is excluded: [service] %s", event.getServiceName()));
+                }
+                return false;
+            }
+        }
+
+        // Apply cluster filter
+        if (TopologyClusterFilter.getInstance().isActive()) {
+            if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) {
+                // Cluster is excluded, do not update topology or fire event
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId()));
+                }
+                return false;
+            }
+        }
+
+        // Validate event properties
+        /*Cluster cluster = event.getCluster();
+
+        if(cluster == null) {
+            String msg = "Cluster object of cluster created event is null.";
+            log.error(msg);
+            throw new RuntimeException(msg);
+        }
+        if (cluster.getHostNames().isEmpty()) {
+            throw new RuntimeException("Host name/s not found in cluster created event");
+        }*/
+        // Validate event against the existing topology
+        Service service = topology.getService(event.getServiceName());
+        if (service == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Service does not exist: [service] %s",
+                        event.getServiceName()));
+            }
+            return false;
+        }
+        if (service.clusterExists(event.getClusterId())) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(),
+                        event.getClusterId()));
+            }
+        } else {
+
+            // Apply changes to the topology
+            Cluster cluster = service.getCluster(event.getClusterId());
+            if (!cluster.isStateTransitionValid(ClusterStatus.Created)) {
+                log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Created + " " +
+                        "for cluster " + cluster.getClusterId());
+            }
+            cluster.setStatus(ClusterStatus.Created);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Cluster reset as Created: %s",
+                        cluster.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}


Mime
View raw message