stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ud...@apache.org
Subject git commit: groupTerminating and groupTerminated events, listenrs and prcessors
Date Wed, 22 Oct 2014 12:15:49 GMT
Repository: stratos
Updated Branches:
  refs/heads/4.0.0-grouping 855039889 -> 773a01eaa


groupTerminating and groupTerminated events, listenrs and prcessors


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/773a01ea
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/773a01ea
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/773a01ea

Branch: refs/heads/4.0.0-grouping
Commit: 773a01eaa519e77fa01e3eebd1d616e905105953
Parents: 8550398
Author: Udara Liyanage <udara@wso2.com>
Authored: Wed Oct 22 17:44:43 2014 +0530
Committer: Udara Liyanage <udara@wso2.com>
Committed: Wed Oct 22 17:45:26 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        |  49 ++++++++-
 .../ApplicationStatusTopicReceiver.java         |  20 ++++
 .../controller/topology/TopologyBuilder.java    |  80 +++++++++++++-
 .../topology/TopologyEventPublisher.java        |  21 +++-
 .../status/GroupInInactivateEvent.java          |  44 ++++++++
 .../status/GroupInTerminatedEvent.java          |  44 ++++++++
 .../status/GroupInTerminatingEvent.java         |  44 ++++++++
 .../event/topology/GroupInactivatedEvent.java   |  43 ++++++++
 .../event/topology/GroupTerminatedEvent.java    |  43 ++++++++
 .../event/topology/GroupTerminatingEvent.java   |  43 ++++++++
 .../status/GroupInTerminatedEventListener.java  |  27 +++++
 .../status/GroupInTerminatingEventListener.java |  27 +++++
 .../topology/GroupTerminatedEventListener.java  |  27 +++++
 .../topology/GroupTerminatingEventListener.java |  27 +++++
 ...onStatusGroupTerminatedMessageProcessor.java |  61 ++++++++++
 ...nStatusGroupTerminatingMessageProcessor.java |  61 ++++++++++
 .../ApplicationStatusMessageProcessorChain.java |  15 ++-
 .../topology/GroupTerminatedProcessor.java      | 110 +++++++++++++++++++
 .../topology/GroupTerminatingProcessor.java     | 110 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |  22 ++--
 20 files changed, 900 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 2f31d1b..7ae7df0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -35,9 +35,14 @@ import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
 import org.apache.stratos.autoscaler.partition.PartitionManager;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.status.checker.StatusChecker;
-import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
 import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.application.status.GroupInTerminatingEvent;
 import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.application.status.GroupInTerminatingEventListener;
 import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -238,6 +243,48 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             }
         });
 
+        topologyEventReceiver.addEventListener(new GroupTerminatingEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[GroupTerminatedEven] Received: " + event.getClass());
+
+                GroupTerminatingEvent groupTerminatingEvent = (GroupTerminatingEvent) event;
+                String appId = groupTerminatingEvent.getAppId();
+                String groupId = groupTerminatingEvent.getGroupId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                monitor.setStatus(GroupStatus.Terminating);
+
+                //starting the status checker to decide on the status of it's parent
+                //StatusChecker.getInstance().onGroupStatusChange(groupId, appId);
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new GroupTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[GroupTerminatedEven] Received: " + event.getClass());
+
+                GroupTerminatedEvent groupTerminatedEvent = (GroupTerminatedEvent) event;
+                String appId = groupTerminatedEvent.getAppId();
+                String groupId = groupTerminatedEvent.getGroupId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                monitor.setStatus(GroupStatus.Terminated);
+
+                //starting the status checker to decide on the status of it's parent
+                //StatusChecker.getInstance().onGroupStatusChange(groupId, appId);
+            }
+        });
+
         topologyEventReceiver.addEventListener(new ApplicationActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java
index cfcea8a..bfd0167 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java
@@ -23,9 +23,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.application.status.*;
+import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
+import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
 import org.apache.stratos.messaging.listener.application.status.*;
 import org.apache.stratos.messaging.listener.topology.ClusterActivatedEventListener;
 import org.apache.stratos.messaging.listener.topology.GroupActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.GroupTerminatedEventListener;
+import org.apache.stratos.messaging.listener.topology.GroupTerminatingEventListener;
 import org.apache.stratos.messaging.message.receiver.application.status.ApplicationStatusEventReceiver;
 
 public class ApplicationStatusTopicReceiver implements Runnable {
@@ -82,6 +86,22 @@ public class ApplicationStatusTopicReceiver implements Runnable {
             }
         });
 
+        statusEventReceiver.addEventListener(new GroupTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                TopologyBuilder.handleGroupTerminatedEvent((GroupInTerminatedEvent) event);
+
+            }
+        });
+
+        statusEventReceiver.addEventListener(new GroupTerminatingEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                TopologyBuilder.handleGroupTerminatingEvent((GroupInTerminatingEvent) event);
+
+            }
+        });
+
         statusEventReceiver.addEventListener(new ApplicationActivatedEventListener() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index c3286c5..cee40f3 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -32,14 +32,18 @@ import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.domain.topology.util.CompositeApplicationBuilder;
 import org.apache.stratos.messaging.event.application.status.*;
+import org.apache.stratos.messaging.event.application.status.ApplicationActivatedEvent;
+import org.apache.stratos.messaging.event.application.status.ApplicationCreatedEvent;
+import org.apache.stratos.messaging.event.application.status.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.event.application.status.ApplicationTerminatedEvent;
+import org.apache.stratos.messaging.event.application.status.ApplicationTerminatingEvent;
+import org.apache.stratos.messaging.event.application.status.ClusterActivatedEvent;
+import org.apache.stratos.messaging.event.application.status.GroupActivatedEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
 import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.event.topology.*;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
 import java.util.*;
@@ -1022,4 +1026,72 @@ public class TopologyBuilder {
             TopologyManager.releaseWriteLock();
         }
     }
+
+    public static void handleGroupTerminatedEvent(GroupInTerminatedEvent event) {
+        Topology topology = TopologyManager.getTopology();
+        Application application = topology.getApplication(event.getAppId());
+        //update the status of the Group
+        if (application == null) {
+            log.warn(String.format("Application %s does not exist",
+                    event.getAppId()));
+            return;
+        }
+
+        Group group = application.getGroupRecursively(event.getGroupId());
+        if (group == null) {
+            log.warn(String.format("Group %s does not exist",
+                    event.getGroupId()));
+            return;
+        }
+
+        org.apache.stratos.messaging.event.topology.GroupTerminatedEvent groupTerminatedTopologyEvent =
+                new org.apache.stratos.messaging.event.topology.GroupTerminatedEvent(
+                        event.getAppId(),
+                        event.getGroupId());
+        try {
+            TopologyManager.acquireWriteLock();
+            group.setStatus(GroupStatus.Terminated);
+            log.info("Group activated adding status started");
+
+            TopologyManager.updateTopology(topology);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        //publishing data
+        TopologyEventPublisher.sendGroupTerminatedEvent(groupTerminatedTopologyEvent);
+    }
+
+    public static void handleGroupTerminatingEvent(GroupInTerminatingEvent event) {
+        Topology topology = TopologyManager.getTopology();
+        Application application = topology.getApplication(event.getAppId());
+        //update the status of the Group
+        if (application == null) {
+            log.warn(String.format("Application %s does not exist",
+                    event.getAppId()));
+            return;
+        }
+
+        Group group = application.getGroupRecursively(event.getGroupId());
+        if (group == null) {
+            log.warn(String.format("Group %s does not exist",
+                    event.getGroupId()));
+            return;
+        }
+
+        org.apache.stratos.messaging.event.topology.GroupTerminatingEvent groupTerminatingTopologyEvent =
+                new org.apache.stratos.messaging.event.topology.GroupTerminatingEvent(
+                        event.getAppId(),
+                        event.getGroupId());
+        try {
+            TopologyManager.acquireWriteLock();
+            group.setStatus(GroupStatus.Terminating);
+            log.info("Group activated adding status started");
+
+            TopologyManager.updateTopology(topology);
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+        //publishing data
+        TopologyEventPublisher.sendGroupTerminatingEvent(groupTerminatingTopologyEvent);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index 93523a4..f6b247b 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -26,11 +26,6 @@ import org.apache.stratos.cloud.controller.pojo.PortMapping;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ConfigCompositeApplication;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.ServiceType;
-import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
@@ -290,4 +285,20 @@ public class TopologyEventPublisher {
         }
         publishEvent(applicationTerminatedEvent);
     }
+
+    public static void sendGroupTerminatedEvent(GroupTerminatedEvent groupTerminatedTopologyEvent) {
+        if(log.isInfoEnabled()) {
+            log.info(String.format("Publishing group terminated event: [appId] %s",
+                    groupTerminatedTopologyEvent.getAppId()));
+        }
+        publishEvent(groupTerminatedTopologyEvent);
+    }
+
+    public static void sendGroupTerminatingEvent(GroupTerminatingEvent groupTerminatingTopologyEvent) {
+        if(log.isInfoEnabled()) {
+            log.info(String.format("Publishing group terminating event: [appId] %s",
+                    groupTerminatingTopologyEvent.getAppId()));
+        }
+        publishEvent(groupTerminatingTopologyEvent);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java
new file mode 100644
index 0000000..6aeada7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class GroupInInactivateEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private String groupId;
+    private String appId;
+
+    public GroupInInactivateEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getGroupId() {
+        return this.groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java
new file mode 100644
index 0000000..16cc0c7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class GroupInTerminatedEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private String groupId;
+    private String appId;
+
+    public GroupInTerminatedEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getGroupId() {
+        return this.groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java
new file mode 100644
index 0000000..6eaf5c3
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class GroupInTerminatingEvent extends StatusEvent {
+    private static final long serialVersionUID = 2625412714611885089L;
+
+    private String groupId;
+    private String appId;
+
+    public GroupInTerminatingEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getGroupId() {
+        return this.groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java
new file mode 100644
index 0000000..176f709
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Group Activated Event which will be sent to Topology upon group activation
+ */
+public class GroupInactivatedEvent extends Event {
+    private String appId;
+    private String groupId;
+
+    public GroupInactivatedEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java
new file mode 100644
index 0000000..667343c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Group Activated Event which will be sent to Topology upon group activation
+ */
+public class GroupTerminatedEvent extends Event {
+    private String appId;
+    private String groupId;
+
+    public GroupTerminatedEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java
new file mode 100644
index 0000000..31e0cc5
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Group Activated Event which will be sent to Topology upon group activation
+ */
+public class GroupTerminatingEvent extends Event {
+    private String appId;
+    private String groupId;
+
+    public GroupTerminatingEvent(String appId, String groupId) {
+        this.appId = appId;
+        this.groupId = groupId;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java
new file mode 100644
index 0000000..8cb331c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Created by reka on 9/22/14.
+ */
+public abstract class GroupInTerminatedEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java
new file mode 100644
index 0000000..20c8b68
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Created by reka on 9/22/14.
+ */
+public abstract class GroupInTerminatingEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java
new file mode 100644
index 0000000..af409e7
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * This will get triggered by the groups activation processor after processing the event
+ */
+public abstract class GroupTerminatedEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java
new file mode 100644
index 0000000..ab4042f
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * This will get triggered by the groups activation processor after processing the event
+ */
+public abstract class GroupTerminatingEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java
new file mode 100644
index 0000000..93dd750
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.application.status.GroupInTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationStatusGroupTerminatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationStatusGroupTerminatedMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupInTerminatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupInTerminatedEvent event =
+                    (GroupInTerminatedEvent) Util.jsonToObject(message, GroupInTerminatedEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupTerminatingEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group in GroupTerminatingEvent message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java
new file mode 100644
index 0000000..493bd6c
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class ApplicationStatusGroupTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationStatusGroupTerminatingMessageProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (GroupTerminatingEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            GroupTerminatingEvent event =
+                    (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Received GroupTerminatingEvent: " + event.toString());
+            }
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        } else {
+            if (nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            } else {
+                throw new RuntimeException(
+                        String.format("Failed to process group in GroupTerminatingEvent message " +
+                                "using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java
index 3c32de8..d5dccbb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java
@@ -43,6 +43,9 @@ public class ApplicationStatusMessageProcessorChain extends MessageProcessorChai
     private ApplicationStatusAppTerminatedMessageProcessor applicationStatusAppTerminatedMessageProcessor;
     private ApplicationStatusAppTerminatingMessageProcessor applicationStatusAppTerminatingMessageProcessor;
 
+    private ApplicationStatusGroupTerminatedMessageProcessor groupTerminatedMessageProcessor;
+    private ApplicationStatusGroupTerminatingMessageProcessor groupTerminatingMessageProcessor;
+
     public void initialize() {
         // Add instance notifier event processors
         clusterActivatedMessageProcessor = new ApplicationStatusClusterActivatedMessageProcessor();
@@ -72,6 +75,11 @@ public class ApplicationStatusMessageProcessorChain extends MessageProcessorChai
         applicationStatusAppTerminatingMessageProcessor = new ApplicationStatusAppTerminatingMessageProcessor();
         this.add(applicationStatusAppTerminatingMessageProcessor);
 
+        groupTerminatedMessageProcessor = new ApplicationStatusGroupTerminatedMessageProcessor();
+        this.add(groupTerminatedMessageProcessor);
+
+        groupTerminatingMessageProcessor = new ApplicationStatusGroupTerminatingMessageProcessor();
+        this.add(groupTerminatingMessageProcessor);
 
         if (log.isDebugEnabled()) {
             log.debug("Instance notifier message processor chain initialized");
@@ -97,7 +105,12 @@ public class ApplicationStatusMessageProcessorChain extends MessageProcessorChai
             applicationStatusAppTerminatingMessageProcessor.addEventListener(eventListener);
         } else if(eventListener instanceof ApplicationTerminatedEventListener){
             applicationStatusAppTerminatedMessageProcessor.addEventListener(eventListener);
-        } else {
+        } else if (eventListener instanceof GroupInTerminatingEventListener){
+            groupTerminatingMessageProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof  GroupInTerminatedEventListener){
+            groupTerminatedMessageProcessor.addEventListener(eventListener);
+        } else
+        {
             throw new RuntimeException("Unknown event listener");
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
new file mode 100644
index 0000000..6196776
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.Group;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupTerminatedProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupTerminatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupTerminatedEvent event = (GroupTerminatedEvent) Util.
+                    jsonToObject(message, GroupTerminatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupTerminatedEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Terminating)) {
+                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+            }
+            group.setStatus(GroupStatus.Terminated);
+            if (log.isInfoEnabled())     {
+                log.info(String.format("Group updated as activated : %s",
+                        group.getUniqueIdentifier()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
new file mode 100644
index 0000000..afec0f2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.Group;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupTerminatingProcessor extends MessageProcessor {
+    private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class);
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (GroupTerminatingEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            GroupTerminatingEvent event = (GroupTerminatingEvent) Util.
+                    jsonToObject(message, GroupTerminatingEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (GroupTerminatingEvent event,Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        }
+        Group group = application.getGroupRecursively(event.getGroupId());
+
+        if (group == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+                        event.getGroupId()));
+            }
+        } else {
+            // Apply changes to the topology
+            if (!group.isStateTransitionValid(GroupStatus.Active)) {
+                log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+            }
+            group.setStatus(GroupStatus.Terminating);
+            if (log.isInfoEnabled())     {
+                log.info(String.format("Group updated as Terminating : %s",
+                        group.getUniqueIdentifier()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 77e4b01..1fb3961 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -22,14 +22,10 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.listener.application.status.*;
+import org.apache.stratos.messaging.listener.application.status.ApplicationInActivatedEventListener;
+import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatedEventListener;
+import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatingEventListener;
 import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.listener.topology.ApplicationActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ApplicationCreatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ClusterActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ClusterInActivateEventListener;
-import org.apache.stratos.messaging.listener.topology.GroupActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.GroupInActivateEventListener;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 
 /**
@@ -62,6 +58,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
     private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
     private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
     private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
+    private GroupTerminatingProcessor groupTerminatingProcessor;
+    private GroupTerminatedProcessor groupTerminatedProcessor;
 
     public void initialize() {
         // Add topology event processors
@@ -113,6 +111,12 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
         groupInActivateProcessor = new GroupInActivateProcessor();
         add(groupInActivateProcessor);
 
+        groupTerminatingProcessor = new GroupTerminatingProcessor();
+        add(groupTerminatingProcessor);
+
+        groupTerminatedProcessor = new GroupTerminatedProcessor();
+        add(groupTerminatedProcessor);
+
         applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor();
         add(applicationCreatedMessageProcessor);
 
@@ -172,6 +176,10 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
             groupActivatedProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof GroupInActivateEventListener) {
             groupInActivateProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupTerminatedEventListener){
+            groupTerminatedProcessor.addEventListener(eventListener);
+        } else if (eventListener instanceof GroupTerminatingEventListener){
+            groupTerminatingProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationCreatedEventListener) {
             applicationCreatedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationUndeployedEventListener) {


Mime
View raw message