stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [07/10] removing application status and adding applications, cluster status topic
Date Fri, 31 Oct 2014 07:51:40 GMT
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java
new file mode 100644
index 0000000..69918e2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterInactivateMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterInactivateEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterInactivateMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(ClusterStatusClusterInactivateMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterStatusClusterInactivateEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterStatusClusterInactivateEvent event = (ClusterStatusClusterInactivateEvent) Util.
+                                                jsonToObject(message, ClusterStatusClusterInactivateEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received ClusterInActivateEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java
new file mode 100644
index 0000000..64b6c7b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterTerminatedMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(ClusterStatusClusterTerminatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterStatusClusterTerminatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterStatusClusterTerminatedEvent event = (ClusterStatusClusterTerminatedEvent) Util.
+                    jsonToObject(message, ClusterStatusClusterTerminatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received AppStatusClusterTerminatedEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java
new file mode 100644
index 0000000..c161dd5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterTerminatingMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class ClusterStatusClusterTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(ClusterStatusClusterTerminatingMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ClusterStatusClusterTerminatingEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ClusterStatusClusterTerminatingEvent event = (ClusterStatusClusterTerminatingEvent) Util.
+                                                jsonToObject(message, ClusterStatusClusterTerminatingEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received AppStatusClusterTerminatingEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(String.format("Failed to process cluster activated message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
new file mode 100644
index 0000000..29556ec
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.cluster.status.*;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * This is to keep track of the processors for the cluster status topic.
+ */
+public class ClusterStatusMessageProcessorChain extends MessageProcessorChain {
+    private static final Log log = LogFactory.getLog(ClusterStatusMessageProcessorChain.class);
+
+
+    private ClusterStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor;
+    private ClusterStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
+    private ClusterStatusClusterInactivateMessageProcessor clusterInactivateMessageProcessor;
+    private ClusterStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor;
+    private ClusterStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor;
+    @Override
+    protected void initialize() {
+        clusterCreatedMessageProcessor = new ClusterStatusClusterCreatedMessageProcessor();
+        add(clusterCreatedMessageProcessor);
+
+        clusterActivatedMessageProcessor = new ClusterStatusClusterActivatedMessageProcessor();
+        add(clusterActivatedMessageProcessor);
+
+        clusterInactivateMessageProcessor = new ClusterStatusClusterInactivateMessageProcessor();
+        add(clusterInactivateMessageProcessor);
+
+        clusterTerminatedMessageProcessor = new ClusterStatusClusterTerminatedMessageProcessor();
+        add(clusterTerminatedMessageProcessor);
+
+        clusterTerminatingMessageProcessor = new ClusterStatusClusterTerminatingMessageProcessor();
+        add(clusterTerminatingMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Cluster status  message processor chain initialized");
+        }
+    }
+
+    @Override
+    public void addEventListener(EventListener eventListener) {
+        if(eventListener instanceof ClusterStatusClusterCreatedEventListener) {
+            clusterCreatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ClusterStatusClusterInactivateEventListener) {
+            clusterInactivateMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ClusterStatusClusterActivatedEventListener) {
+            clusterActivatedMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ClusterStatusClusterTerminatingEventListener) {
+            clusterTerminatingMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof ClusterStatusClusterTerminatedEventListener) {
+            clusterTerminatedMessageProcessor.addEventListener(eventListener);
+        } else {
+            throw new RuntimeException("Unknown event listener " + eventListener.toString());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
deleted file mode 100644
index 341b402..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationActivatedMessageProcessor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application activation even and update the Topology.
- */
-public class ApplicationActivatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationActivatedMessageProcessor.class);
-
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (ApplicationActivatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            ApplicationActivatedEvent event = (ApplicationActivatedEvent) Util.
-                    jsonToObject(message, ApplicationActivatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(ApplicationActivatedEvent event, Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        } else {
-            // Apply changes to the topology
-            if (!application.isStateTransitionValid(ApplicationStatus.Active)) {
-                log.error("Invalid State transfer from [ " + application.getStatus() +
-                        " ] to [ " + ApplicationStatus.Active + " ]");
-            }
-            application.setStatus(ApplicationStatus.Active);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
deleted file mode 100644
index 079cb90..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationCreatedMessageProcessor.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Set;
-
-public class ApplicationCreatedMessageProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ApplicationCreatedMessageProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-
-        Topology topology = (Topology) object;
-
-        if (ApplicationCreatedEvent.class.getName().equals(type)) {
-            if (!topology.isInitialized()) {
-                return false;
-            }
-
-            ApplicationCreatedEvent event = (ApplicationCreatedEvent) Util.jsonToObject(message, ApplicationCreatedEvent.class);
-            if (event == null) {
-                log.error("Unable to convert the JSON message to ApplicationCreatedEvent");
-                return false;
-            }
-
-            TopologyUpdater.acquireWriteLockForApplications();
-            // since the Clusters will also get modified, acquire write locks for each Service Type
-            Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively();
-            if (clusterDataHolders != null) {
-                for (ClusterDataHolder clusterData : clusterDataHolders) {
-                    TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
-                }
-            }
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                if (clusterDataHolders != null) {
-                    for (ClusterDataHolder clusterData : clusterDataHolders) {
-                        TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
-                    }
-                }
-                TopologyUpdater.releaseWriteLockForApplications();
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (ApplicationCreatedEvent event,Topology topology) {
-
-        // check if required properties are available
-        if (event.getApplication() == null) {
-            String errorMsg = "Application object of application created event is invalid";
-            log.error(errorMsg);
-            throw new RuntimeException(errorMsg);
-        }
-
-        if (event.getApplication().getUniqueIdentifier() == null || event.getApplication().getUniqueIdentifier().isEmpty()) {
-            String errorMsg = "App id of application created event is invalid: [ " + event.getApplication().getUniqueIdentifier() + " ]";
-            log.error(errorMsg);
-            throw new RuntimeException(errorMsg);
-        }
-
-        // check if an Application with same name exists in topology
-        if (topology.applicationExists(event.getApplication().getUniqueIdentifier())) {
-            log.warn("Application with id [ " + event.getApplication().getUniqueIdentifier() + " ] already exists in Topology");
-
-        } else {
-            // add application and the clusters to Topology
-            for(Cluster cluster: event.getClusterList()) {
-                topology.getService(cluster.getServiceName()).addCluster(cluster);
-            }
-            topology.addApplication(event.getApplication());
-        }
-
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
deleted file mode 100644
index 8c88324..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
-
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (ApplicationInactivatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
-                    jsonToObject(message, ApplicationInactivatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        } else {
-            // Apply changes to the topology
-            if (!application.isStateTransitionValid(ApplicationStatus.Inactive)) {
-                log.error("Invalid State transfer from [ " + application.getStatus() +
-                        " ] to [ " + ApplicationStatus.Inactive + " ]");
-            }
-            application.setStatus(ApplicationStatus.Inactive);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
deleted file mode 100644
index 2dd3ea7..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Set;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
-
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (ApplicationTerminatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            ApplicationTerminatedEvent event = (ApplicationTerminatedEvent) Util.
-                    jsonToObject(message, ApplicationTerminatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplications();
-                        Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
-            if (clusterDataHolders != null) {
-                for (ClusterDataHolder clusterData : clusterDataHolders) {
-                    TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType());
-                }
-            }
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplications();
-                if (clusterDataHolders != null) {
-                    for (ClusterDataHolder clusterData : clusterDataHolders) {
-                        TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType());
-                    }
-                }
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (ApplicationTerminatedEvent event, Topology topology) {
-
-        // check if required properties are available
-        if (event.getAppId() == null) {
-            String errorMsg = "Application Id of application removed event is invalid";
-            log.error(errorMsg);
-            throw new RuntimeException(errorMsg);
-        }
-
-        if (event.getTenantDomain()== null) {
-            String errorMsg = "Application tenant domain of application removed event is invalid";
-            log.error(errorMsg);
-            throw new RuntimeException(errorMsg);
-        }
-
-        // check if an Application with same name exists in topology
-        String appId = event.getAppId();
-        if (topology.applicationExists(appId)) {
-            log.warn("Application with id [ " + appId + " ] still exists in Topology, removing it");
-            topology.removeApplication(appId);
-        }
-
-        if (event.getClusterData() != null) {
-            // remove the Clusters from the Topology
-            for (ClusterDataHolder clusterData : event.getClusterData()) {
-                Service service = topology.getService(clusterData.getServiceType());
-                if (service != null) {
-                    service.removeCluster(clusterData.getClusterId());
-                    if (log.isDebugEnabled()) {
-                        log.debug("Removed the Cluster " + clusterData.getClusterId() + " from Topology");
-                    }
-                }  else {
-                    log.warn("Service " + clusterData.getServiceType() + " not found in Topology!");
-                }
-            }
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("ApplicationRemovedMessageProcessor notifying listener ");
-        }
-
-        notifyEventListeners(event);
-        return true;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java
deleted file mode 100644
index 032be79..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatingMessageProcessor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor responsible to process the application Inactivation even and update the Topology.
- */
-public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
-    private static final Log log =
-            LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
-
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (ApplicationTerminatingEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            ApplicationTerminatingEvent event = (ApplicationTerminatingEvent) Util.
-                    jsonToObject(message, ApplicationTerminatingEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (ApplicationTerminatingEvent event, Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        } else {
-            // Apply changes to the topology
-            if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
-                log.error("Invalid State transfer from [ " + application.getStatus() +
-                        " ] to [ " + ApplicationStatus.Terminating + " ]");
-            }
-            application.setStatus(ApplicationStatus.Terminating);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java
deleted file mode 100644
index f368dd1..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationUndeployedMessageProcessor.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Set;
-
-public class ApplicationUndeployedMessageProcessor extends MessageProcessor {
-
-    private static final Log log = LogFactory.getLog(ApplicationUndeployedMessageProcessor.class);
-
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-
-        Topology topology = (Topology) object;
-
-        if (ApplicationUndeployedEvent.class.getName().equals(type)) {
-            if (!topology.isInitialized()) {
-                return false;
-            }
-
-            ApplicationUndeployedEvent event = (ApplicationUndeployedEvent)
-                    Util.jsonToObject(message, ApplicationUndeployedEvent.class);
-            if (event == null) {
-                log.error("Unable to convert the JSON message to ApplicationUndeployedEvent");
-                return false;
-            }
-
-            // get write lock for the application and relevant Clusters
-            TopologyUpdater.acquireWriteLockForApplication(event.getApplicationId());
-            Set<ClusterDataHolder> clusterDataHolders = event.getClusterData();
-            if (clusterDataHolders != null) {
-                for (ClusterDataHolder clusterData : clusterDataHolders) {
-                    TopologyUpdater.acquireWriteLockForCluster(clusterData.getServiceType(),
-                            clusterData.getClusterId());
-                }
-            }
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                // remove locks
-                if (clusterDataHolders != null) {
-                    for (ClusterDataHolder clusterData : clusterDataHolders) {
-                        TopologyUpdater.releaseWriteLockForCluster(clusterData.getServiceType(),
-                                clusterData.getClusterId());
-                    }
-                }
-                TopologyUpdater.releaseWriteLockForApplication(event.getApplicationId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format
-                    ("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (ApplicationUndeployedEvent event, Topology topology) {
-
-        // update the application status to Terminating
-        Application application = topology.getApplication(event.getApplicationId());
-        // check and update application status to 'Terminating'
-        if (!application.isStateTransitionValid(ApplicationStatus.Terminating)) {
-            log.error("Invalid state transfer from " + application.getStatus() + " to " + ApplicationStatus.Terminating);
-        }
-        // for now anyway update the status forcefully
-        application.setStatus(ApplicationStatus.Terminating);
-
-        // update all the Clusters' statuses to 'Terminating'
-        Set<ClusterDataHolder> clusterData = application.getClusterDataRecursively();
-        // update the Cluster statuses to Terminating
-        for (ClusterDataHolder clusterDataHolder : clusterData) {
-            Service service = topology.getService(clusterDataHolder.getServiceType());
-            if (service != null) {
-                Cluster aCluster = service.getCluster(clusterDataHolder.getClusterId());
-                if (aCluster != null) {
-                    // validate state transition
-                    if (!aCluster.isStateTransitionValid(ClusterStatus.Terminating)) {
-                        log.error("Invalid state transfer from " + aCluster.getStatus() + " to "
-                                + ClusterStatus.Terminating);
-                    }
-                    // for now anyway update the status forcefully
-                    aCluster.setStatus(ClusterStatus.Terminating);
-
-                } else {
-                    log.warn("Unable to find Cluster with cluster id " + clusterDataHolder.getClusterId() +
-                            " in Topology");
-                }
-
-            } else {
-                log.warn("Unable to remove cluster with cluster id: " + clusterDataHolder.getClusterId() + " from Topology, " +
-                        " associated Service [ " + clusterDataHolder.getServiceType() + " ] npt found");
-            }
-        }
-
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
deleted file mode 100644
index 09b9062..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupActivatedProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupActivatedProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupActivatedProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (GroupActivatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupActivatedEvent event = (GroupActivatedEvent) Util.
-                    jsonToObject(message, GroupActivatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (GroupActivatedEvent event,Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-            }
-        } else {
-            // Apply changes to the topology
-            if (!group.isStateTransitionValid(GroupStatus.Active)) {
-                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
-            }
-            group.setStatus(GroupStatus.Active);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
deleted file mode 100644
index d053f5d..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupCreatedProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (GroupCreatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupCreatedEvent event = (GroupCreatedEvent) Util.
-                    jsonToObject(message, GroupCreatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (GroupCreatedEvent event,Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-            }
-        } else {
-            // Apply changes to the topology
-            if (!group.isStateTransitionValid(GroupStatus.Created)) {
-                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Created  + " " +
-                        "for Group " + group.getAlias());
-            }
-            group.setStatus(GroupStatus.Created);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
deleted file mode 100644
index add40b4..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupInActivateProcessor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupInactivateEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupInActivateProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupInActivateProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (GroupInactivateEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupInactivateEvent event = (GroupInactivateEvent) Util.
-                    jsonToObject(message, GroupInactivateEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess(GroupInactivateEvent event, Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-            }
-        } else {
-            group.setStatus(GroupStatus.Inactive);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Group updated as in-activated : %s",
-                        group.getUniqueIdentifier()));
-            }
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
deleted file mode 100644
index 767ff71..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatedProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (GroupTerminatedEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
-                    jsonToObject(message, GroupTerminatedEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (GroupTerminatedEvent event,Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-            }
-        } else {
-            // Apply changes to the topology
-            if (!group.isStateTransitionValid(GroupStatus.Terminated)) {
-                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Terminated);
-            }
-            group.setStatus(GroupStatus.Terminated);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
deleted file mode 100644
index 5b15532..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.processor.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.Group;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.topology.Topology;
-import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
-import org.apache.stratos.messaging.message.processor.MessageProcessor;
-import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
-import org.apache.stratos.messaging.util.Util;
-
-/**
- * This processor will act upon the Group activation events
- */
-public class GroupTerminatingProcessor extends MessageProcessor {
-    private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
-    private MessageProcessor nextProcessor;
-
-    @Override
-    public void setNext(MessageProcessor nextProcessor) {
-        this.nextProcessor = nextProcessor;
-    }
-
-    @Override
-    public boolean process(String type, String message, Object object) {
-        Topology topology = (Topology) object;
-
-        if (GroupTerminatingEvent.class.getName().equals(type)) {
-            // Return if topology has not been initialized
-            if (!topology.isInitialized())
-                return false;
-
-            // Parse complete message and build event
-            GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
-                    jsonToObject(message, GroupTerminatingEvent.class);
-
-            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
-
-            try {
-                return doProcess(event, topology);
-
-            } finally {
-                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
-            }
-
-        } else {
-            if (nextProcessor != null) {
-                // ask the next processor to take care of the message.
-                return nextProcessor.process(type, message, topology);
-            } else {
-                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
-            }
-        }
-    }
-
-    private boolean doProcess (GroupTerminatingEvent event,Topology topology) {
-
-        // Validate event against the existing topology
-        Application application = topology.getApplication(event.getAppId());
-        if (application == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Application does not exist: [service] %s",
-                        event.getAppId()));
-            }
-            return false;
-        }
-        Group group = application.getGroupRecursively(event.getGroupId());
-
-        if (group == null) {
-            if (log.isWarnEnabled()) {
-                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
-                        event.getGroupId()));
-            }
-        } else {
-            // Apply changes to the topology
-            if (!group.isStateTransitionValid(GroupStatus.Terminating)) {
-                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
-            }
-            group.setStatus(GroupStatus.Terminating);
-
-        }
-
-        // Notify event listeners
-        notifyEventListeners(event);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index d8d25d9..048e9a3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -22,7 +22,6 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.listener.applications.ApplicationUndeployedEventListener;
 import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 
@@ -47,17 +46,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
     private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor;
     private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
     private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
-    private GroupCreatedProcessor groupCreatedProcessor;
-    private GroupActivatedProcessor groupActivatedProcessor;
-    private GroupInActivateProcessor groupInActivateProcessor;
-    private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
-    private ApplicationUndeployedMessageProcessor applicationUndeployedMessageProcessor;
-    private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
-    private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
-    private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
-    private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
-    private GroupTerminatingProcessor groupTerminatingProcessor;
-    private GroupTerminatedProcessor groupTerminatedProcessor;
     private ClusterTerminatingProcessor clusterTerminatingProcessor;
     private ClusterTerminatedProcessor clusterTerminatedProcessor;
 
@@ -111,39 +99,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
         memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor();
         add(memberTerminatedMessageProcessor);
 
-        groupCreatedProcessor = new GroupCreatedProcessor();
-        add(groupCreatedProcessor);
-
-        groupActivatedProcessor = new GroupActivatedProcessor();
-        add(groupActivatedProcessor);
-
-        groupInActivateProcessor = new GroupInActivateProcessor();
-        add(groupInActivateProcessor);
-
-        groupTerminatingProcessor = new GroupTerminatingProcessor();
-        add(groupTerminatingProcessor);
-
-        groupTerminatedProcessor = new GroupTerminatedProcessor();
-        add(groupTerminatedProcessor);
-
-        applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
-        add(applicationCreatedMessageProcessor);
-
-        applicationUndeployedMessageProcessor = new ApplicationUndeployedMessageProcessor();
-        add(applicationUndeployedMessageProcessor);
-
-        applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
-        add(applicationActivatedMessageProcessor);
-
-        applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
-        add(applicationInactivatedMessageProcessor);
-
-        applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
-        add(applicationTerminatedMessageProcessor);
-
-        applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
-        add(applicationTerminatingMessageProcessor);
-
         if (log.isDebugEnabled()) {
             log.debug("Topology message processor chain initialized X1");
         }
@@ -182,30 +137,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
             serviceRemovedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof  MemberMaintenanceListener) {
             memberMaintenanceModeProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupActivatedEventListener) {
-            groupActivatedProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupCreatedEventListener) {
-            groupCreatedProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupInActivateEventListener) {
-            groupInActivateProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupTerminatedEventListener){
-            groupTerminatedProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof GroupTerminatingEventListener){
-            groupTerminatingProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationCreatedEventListener) {
-            applicationCreatedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationUndeployedEventListener) {
-            applicationUndeployedMessageProcessor.addEventListener(eventListener);
-        } else if (eventListener instanceof ApplicationActivatedEventListener) {
-            applicationActivatedMessageProcessor.addEventListener(eventListener);
-        } else  if (eventListener instanceof ApplicationInActivateEventListener){
-            applicationInactivatedMessageProcessor.addEventListener(eventListener);
-        } else if(eventListener instanceof ApplicationTerminatedEventListener){
-            applicationTerminatedMessageProcessor.addEventListener(eventListener);
-        } else if(eventListener instanceof ApplicationTerminatingEventListener){
-            applicationTerminatingMessageProcessor.addEventListener(eventListener);
-        }
-        else {
+        } else {
             throw new RuntimeException("Unknown event listener");
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java
deleted file mode 100644
index 9eda9e0..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageDelegator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-import org.apache.stratos.messaging.message.processor.application.status.AppStatusMessageProcessorChain;
-import org.apache.stratos.messaging.util.Constants;
-
-import javax.jms.TextMessage;
-
-public class ApplicationStatusEventMessageDelegator implements Runnable {
-    private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageDelegator.class);
-
-    private ApplicationStatusEventMessageQueue messageQueue;
-    private MessageProcessorChain processorChain;
-    private boolean terminated;
-
-    public ApplicationStatusEventMessageDelegator(ApplicationStatusEventMessageQueue messageQueue) {
-        this.messageQueue = messageQueue;
-        this.processorChain = new AppStatusMessageProcessorChain();
-    }
-
-    public void addEventListener(EventListener eventListener) {
-        processorChain.addEventListener(eventListener);
-    }
-
-    @Override
-    public void run() {
-        try {
-            if (log.isInfoEnabled()) {
-                log.info("Application status event message delegator started");
-            }
-
-            while (!terminated) {
-                try {
-                    TextMessage message = messageQueue.take();
-
-                    // Retrieve the header
-                    String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
-
-                    // Retrieve the actual message
-                    String json = message.getText();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Application status event message received from queue: %s", type));
-                    }
-
-                    // Delegate message to message processor chain
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Delegating application status event message: %s", type));
-                    }
-                    processorChain.process(type, json, null);
-                } catch (Exception e) {
-                    log.error("Failed to retrieve application status event message", e);
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Application status event message delegator failed", e);
-            }
-        }
-    }
-
-    /**
-     * Terminate topology event message delegator thread.
-     */
-    public void terminate() {
-        terminated = true;
-    }
-
-
-    private EventMessage jsonToEventMessage(String json) {
-
-        EventMessage event = new EventMessage();
-        String message;
-
-        //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains
-        //message
-        String[] MessageParts = json.split(":", 3);
-
-        String eventType = MessageParts[0].trim();
-        eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
-        if(log.isDebugEnabled()){
-            log.debug(String.format("Extracted [event type] %s", eventType));
-        }
-
-        event.setEventName(eventType);
-        String messageTag = MessageParts[1];
-        messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\""));
-
-        if("message".equals(messageTag)){
-            message = MessageParts[2].trim();
-            //Remove trailing bracket twice to get the message
-            message = message.substring(0, message.lastIndexOf("}")).trim();
-            message = message.substring(0, message.lastIndexOf("}")).trim();
-            if(message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1){
-                if(log.isDebugEnabled()) {
-                    log.debug(String.format("[Extracted message] %s ", message));
-                }
-                event.setMessage(message);
-                return event;
-            }
-        }
-        return null;
-    }
-
-    private class EventMessage {
-        private String eventName;
-        private String message;
-
-        private String getEventName() {
-            return eventName;
-        }
-
-        private void setEventName(String eventName) {
-            this.eventName = eventName;
-        }
-
-        public String getMessage() {
-            return message;
-        }
-
-        public void setMessage(String message) {
-            this.message = message;
-        }
-    }
-}


Mime
View raw message