stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From im...@apache.org
Subject stratos git commit: Adding topology application filter
Date Tue, 30 Jun 2015 01:42:40 GMT
Repository: stratos
Updated Branches:
  refs/heads/master 99182790b -> b7897af9c


Adding topology application filter


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b7897af9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b7897af9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b7897af9

Branch: refs/heads/master
Commit: b7897af9c971dc51944ff5dacf6118a349f4a2bc
Parents: 9918279
Author: Imesh Gunaratne <imesh@apache.org>
Authored: Tue Jun 30 07:12:08 2015 +0530
Committer: Imesh Gunaratne <imesh@apache.org>
Committed: Tue Jun 30 07:12:33 2015 +0530

----------------------------------------------------------------------
 components/org.apache.stratos.common/pom.xml    |  1 +
 .../common/constants/StratosConstants.java      |  1 +
 .../topology/TopologyApplicationFilter.java     | 98 ++++++++++++++++++++
 .../filter/topology/TopologyClusterFilter.java  |  8 +-
 .../filter/topology/TopologyMemberFilter.java   | 29 +-----
 .../filter/topology/TopologyServiceFilter.java  |  4 +-
 ...licationClustersCreatedMessageProcessor.java |  7 ++
 .../ClusterCreatedMessageProcessor.java         |  7 ++
 .../ClusterInstanceActivatedProcessor.java      |  7 ++
 .../ClusterInstanceCreatedMessageProcessor.java |  7 ++
 .../ClusterInstanceInactivateProcessor.java     |  7 ++
 .../ClusterInstanceTerminatedProcessor.java     |  7 ++
 .../ClusterInstanceTerminatingProcessor.java    |  7 ++
 .../ClusterRemovedMessageProcessor.java         |  9 ++
 .../topology/ClusterResetMessageProcessor.java  |  7 ++
 .../CompleteTopologyMessageProcessor.java       | 10 +-
 .../MemberActivatedMessageProcessor.java        |  7 ++
 .../topology/MemberCreatedMessageProcessor.java |  7 ++
 .../MemberInitializedMessageProcessor.java      |  7 ++
 .../MemberMaintenanceModeProcessor.java         |  7 ++
 .../MemberReadyToShutdownMessageProcessor.java  |  7 ++
 .../topology/MemberStartedMessageProcessor.java |  7 ++
 .../MemberSuspendedMessageProcessor.java        |  7 ++
 .../MemberTerminatedMessageProcessor.java       |  8 ++
 .../messaging/test/MessageFilterTest.java       | 19 ++--
 25 files changed, 252 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 316aef1..a957d68 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -47,6 +47,7 @@
                         <Bundle-Name>${project.artifactId}</Bundle-Name>
                         <Export-Package>
                             org.apache.stratos.common.*,
+                            org.apache.stratos.common.constants.*,
                             org.apache.stratos.common.domain.*,
                             org.apache.stratos.common.client.*,
                             org.apache.stratos.common.services.*,

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
index 194bd81..1275f5c 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
@@ -168,6 +168,7 @@ public class StratosConstants {
     public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout";
 
     public static final String FILTER_VALUE_SEPARATOR = ",";
+    public static final String TOPOLOGY_APPLICATION_FILTER = "stratos.topology.application.filter";
     public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter";
     public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter";
     public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter";

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java
new file mode 100644
index 0000000..256bda6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyApplicationFilter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.filter.topology;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+
+import java.util.Collection;
+
+/**
+ * A filter to discard topology events which are not in a given application id list.
+ */
+public class TopologyApplicationFilter extends MessageFilter {
+
+    private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
+
+    public static final String TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID = "application-id";
+
+    private static volatile TopologyApplicationFilter instance;
+
+    public TopologyApplicationFilter() {
+        super(StratosConstants.TOPOLOGY_APPLICATION_FILTER);
+    }
+
+    /**
+     * Returns true if application is excluded else returns false.
+     *
+     * @param applicationId
+     * @return
+     */
+    public static boolean apply(String applicationId) {
+        boolean excluded = false;
+        if (getInstance().isActive()) {
+            if (StringUtils.isNotBlank(applicationId) && getInstance().applicationExcluded(applicationId)) {
+                excluded = true;
+            }
+            if (excluded && log.isInfoEnabled()) {
+                log.info(String.format("Application is excluded: [application-id] %s", applicationId));
+            }
+        }
+        return excluded;
+    }
+
+    public static TopologyApplicationFilter getInstance() {
+        if (instance == null) {
+            synchronized (TopologyApplicationFilter.class) {
+                if (instance == null) {
+                    instance = new TopologyApplicationFilter();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Topology application filter instance created");
+                    }
+                }
+            }
+        }
+        return instance;
+    }
+
+    private boolean applicationExcluded(String value) {
+        return excluded(TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID, value);
+    }
+
+    private Collection<String> getIncludedApplicationIds() {
+        return getIncludedPropertyValues(TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID + "=");
+        for (String applicationId : TopologyApplicationFilter.getInstance().getIncludedApplicationIds()) {
+            if (sb.length() > 0) {
+                sb.append(", ");
+            }
+            sb.append(applicationId);
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
index a34d072..350a142 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyClusterFilter.java
@@ -22,6 +22,7 @@ package org.apache.stratos.messaging.message.filter.topology;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.message.filter.MessageFilter;
 
 import java.util.Collection;
@@ -34,12 +35,11 @@ public class TopologyClusterFilter extends MessageFilter {
     private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
 
     public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = "cluster-id";
-    public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter";
 
     private static volatile TopologyClusterFilter instance;
 
     public TopologyClusterFilter() {
-        super(TOPOLOGY_CLUSTER_FILTER);
+        super(StratosConstants.TOPOLOGY_CLUSTER_FILTER);
     }
 
     /**
@@ -54,8 +54,8 @@ public class TopologyClusterFilter extends MessageFilter {
             if (StringUtils.isNotBlank(clusterId) && getInstance().clusterExcluded(clusterId)) {
                 excluded = true;
             }
-            if (excluded && log.isDebugEnabled()) {
-                log.debug(String.format("Cluster is excluded: [cluster] %s", clusterId));
+            if (excluded && log.isInfoEnabled()) {
+                log.info(String.format("Cluster is excluded: [cluster-id] %s", clusterId));
             }
         }
         return excluded;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
index 5bdab83..32d7c25 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyMemberFilter.java
@@ -23,6 +23,7 @@ package org.apache.stratos.messaging.message.filter.topology;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.message.filter.MessageFilter;
 
 import java.util.Collection;
@@ -33,16 +34,13 @@ import java.util.Collection;
 public class TopologyMemberFilter extends MessageFilter {
 
     private static final Log log = LogFactory.getLog(TopologyServiceFilter.class);
-    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
 
-    public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = "lb-cluster-id";
     public static final String TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID = "network-partition-id";
-    public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter";
 
     private static volatile TopologyMemberFilter instance;
 
     public TopologyMemberFilter() {
-        super(TOPOLOGY_MEMBER_FILTER);
+        super(StratosConstants.TOPOLOGY_MEMBER_FILTER);
     }
 
     /**
@@ -55,14 +53,11 @@ public class TopologyMemberFilter extends MessageFilter {
     public static boolean apply(String lbClusterId, String networkPartitionId) {
         boolean excluded = false;
         if (getInstance().isActive()) {
-            if (StringUtils.isNotBlank(lbClusterId) && getInstance().lbClusterIdExcluded(lbClusterId)) {
-                excluded = true;
-            }
             if (StringUtils.isNotBlank(networkPartitionId) && getInstance().networkPartitionExcluded(networkPartitionId)) {
                 excluded = true;
             }
-            if (excluded && log.isDebugEnabled()) {
-                log.debug(String.format("Member is excluded: [lb-cluster] %s", lbClusterId));
+            if (excluded && log.isInfoEnabled()) {
+                log.info(String.format("Member is excluded: [network-partition-id] %s", networkPartitionId));
             }
         }
         return excluded;
@@ -82,14 +77,6 @@ public class TopologyMemberFilter extends MessageFilter {
         return instance;
     }
 
-    private boolean lbClusterIdExcluded(String value) {
-        return excluded(TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID, value);
-    }
-
-    private Collection<String> getIncludedLbClusterIds() {
-        return getIncludedPropertyValues(TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID);
-    }
-
     private boolean networkPartitionExcluded(String value) {
         return excluded(TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID, value);
     }
@@ -101,14 +88,6 @@ public class TopologyMemberFilter extends MessageFilter {
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append(TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID + "=");
-        for (String clusterId : getInstance().getIncludedLbClusterIds()) {
-            if (sb.length() > 0) {
-                sb.append(", ");
-            }
-            sb.append(clusterId);
-        }
-        sb.append(LINE_SEPARATOR);
         sb.append(TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID + "=");
         for (String networkPartitionId : getInstance().getIncludedNetworkPartitionIds()) {
             if (sb.length() > 0) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
index d74b6cc..63bb7b9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/filter/topology/TopologyServiceFilter.java
@@ -54,8 +54,8 @@ public class TopologyServiceFilter extends MessageFilter {
             if (StringUtils.isNotBlank(serviceName) && getInstance().serviceExcluded(serviceName)) {
                 excluded = true;
             }
-            if (excluded && log.isDebugEnabled()) {
-                log.debug(String.format("Service is excluded: [lb-cluster] %s", serviceName));
+            if (excluded && log.isInfoEnabled()) {
+                log.info(String.format("Service is excluded: [service-name] %s", serviceName));
             }
         }
         return excluded;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
index 374ab0f..dcae73e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationClustersCreatedMessageProcessor.java
@@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ApplicationClustersCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -73,10 +74,16 @@ public class ApplicationClustersCreatedMessageProcessor extends MessageProcessor
         List<Cluster> clusters = event.getClusterList();
 
         for (Cluster cluster : clusters) {
+            String applicationId = cluster.getAppId();
             String serviceName = cluster.getServiceName();
             String clusterId = cluster.getClusterId();
             TopologyUpdater.acquireWriteLockForService(serviceName);
+
             try {
+                // Apply application filter
+                if(TopologyApplicationFilter.apply(applicationId)) {
+                    continue;
+                }
 
                 // Apply service filter
                 if (TopologyServiceFilter.apply(serviceName)) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
index 036fe19..0e14bcd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java
@@ -24,6 +24,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -73,9 +74,15 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor {
 
     private boolean doProcess(ClusterCreatedEvent event, Topology topology) {
         Cluster cluster = event.getCluster();
+        String applicationId = cluster.getAppId();
         String serviceName = cluster.getServiceName();
         String clusterId = cluster.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java
index 09d8d84..3d67d72 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceActivatedProcessor.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.instance.ClusterInstance;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.ClusterInstanceActivatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -80,9 +81,15 @@ public class ClusterInstanceActivatedProcessor extends MessageProcessor {
 
     private boolean doProcess(ClusterInstanceActivatedEvent event, Topology topology) {
 
+        String applicationId = event.getAppId();
         String serviceName = event.getServiceName();
         String clusterId = event.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
index d7e4606..8fb28cb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterInstanceCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -97,8 +98,14 @@ public class ClusterInstanceCreatedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
         Cluster cluster = service.getCluster(event.getClusterId());
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         if (cluster == null) {
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java
index 319d71b..c86efe3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceInactivateProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterInstanceInactivateEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -79,9 +80,15 @@ public class ClusterInstanceInactivateProcessor extends MessageProcessor {
 
     private boolean doProcess(ClusterInstanceInactivateEvent event, Topology topology) {
 
+        String applicationId = event.getAppId();
         String serviceName = event.getServiceName();
         String clusterId = event.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java
index 3a56440..e307bd8 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatedProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterInstanceTerminatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -79,9 +80,15 @@ public class ClusterInstanceTerminatedProcessor extends MessageProcessor {
 
     private boolean doProcess(ClusterInstanceTerminatedEvent event, Topology topology) {
 
+        String applicationId = event.getAppId();
         String serviceName = event.getServiceName();
         String clusterId = event.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java
index 75b53b1..158d10f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterInstanceTerminatingProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterInstanceTerminatingEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -79,9 +80,15 @@ public class ClusterInstanceTerminatingProcessor extends MessageProcessor {
 
     private boolean doProcess(ClusterInstanceTerminatingEvent event, Topology topology) {
 
+        String applicationId = event.getAppId();
         String serviceName = event.getServiceName();
         String clusterId = event.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
index 361c6d2..d38ada2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java
@@ -20,9 +20,11 @@ package org.apache.stratos.messaging.message.processor.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -104,6 +106,13 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor {
             }
         } else {
 
+            Cluster cluster = service.getCluster(event.getClusterId());
+
+            // Apply application filter
+            if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+                return false;
+            }
+
             // Apply changes to the topology
             service.removeCluster(event.getClusterId());
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
index 1168776..dc94a2a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java
@@ -26,6 +26,7 @@ import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.ClusterResetEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -76,9 +77,15 @@ public class ClusterResetMessageProcessor extends MessageProcessor {
 
     private boolean doProcess(ClusterResetEvent event, Topology topology) {
 
+        String applicationId = event.getAppId();
         String serviceName = event.getServiceName();
         String clusterId = event.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
index 0317378..6172654 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/CompleteTopologyMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -88,13 +89,16 @@ public class CompleteTopologyMessageProcessor extends MessageProcessor {
             topology.addService(service);
         }
 
-        // Apply cluster filter
+        // Apply application & cluster filters
         for (Service service : topology.getServices()) {
             List<Cluster> clustersToRemove = new ArrayList<Cluster>();
             for (Cluster cluster : service.getClusters()) {
-                if (TopologyClusterFilter.apply(cluster.getClusterId())) {
+                if (TopologyApplicationFilter.apply(cluster.getAppId())) {
                     clustersToRemove.add(cluster);
-                } else {
+                } else if (TopologyClusterFilter.apply(cluster.getClusterId())) {
+                    clustersToRemove.add(cluster);
+                }
+                else {
                     // Add non filtered clusters to clusterId-cluster map
                     if (!topology.clusterExist(cluster.getClusterId())) {
                         topology.addToCluterMap(cluster);

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
index 99ca9b8..6e1d9d8 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberActivatedMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -71,9 +72,15 @@ public class MemberActivatedMessageProcessor extends MessageProcessor {
 
     private boolean doProcess(MemberActivatedEvent event, Topology topology) {
 
+        String applicationId = event.getApplicationId();
         String serviceName = event.getServiceName();
         String clusterId = event.getClusterId();
 
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(applicationId)) {
+            return false;
+        }
+
         // Apply service filter
         if (TopologyServiceFilter.apply(serviceName)) {
             return false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java
index 3669e6c..d6bfd4d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberCreatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberCreatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.processor.MessageProcessor;
@@ -103,6 +104,12 @@ public class MemberCreatedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         if (cluster.memberExists(event.getMemberId())) {
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Member already exists: [service] %s [cluster] %s [member] %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java
index bf3bb97..57d0680 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberInitializedMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -99,6 +100,12 @@ public class MemberInitializedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         Member member = cluster.getMember(event.getMemberId());
         if (member == null) {
             if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
index b0f13b5..f0b53ff 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberMaintenanceModeProcessor.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -102,6 +103,12 @@ public class MemberMaintenanceModeProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         Member member = cluster.getMember(event.getMemberId());
         if (member == null) {
             if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
index 067d729..628123f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberReadyToShutdownMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -101,6 +102,12 @@ public class MemberReadyToShutdownMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         Member member = cluster.getMember(event.getMemberId());
         if (member == null) {
             if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
index 51345e9..990d337 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberStartedMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -101,6 +102,12 @@ public class MemberStartedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         Member member = cluster.getMember(event.getMemberId());
         if (member == null) {
             if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
index da56970..a8b3ac9 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberSuspendedMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -102,6 +103,12 @@ public class MemberSuspendedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         Member member = cluster.getMember(event.getMemberId());
         if (member == null) {
             if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
index 468be6e..ee33269 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.domain.topology.Topology;
 import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -95,6 +96,7 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
         Cluster cluster = service.getCluster(event.getClusterId());
         if (cluster == null) {
             if (log.isWarnEnabled()) {
@@ -102,6 +104,12 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor {
             }
             return false;
         }
+
+        // Apply application filter
+        if(TopologyApplicationFilter.apply(cluster.getAppId())) {
+            return false;
+        }
+
         Member member = cluster.getMember(event.getMemberId());
         if (member != null) {
             // Apply member filter

http://git-wip-us.apache.org/repos/asf/stratos/blob/b7897af9/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
index 2ee08df..dd901fe 100755
--- a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
+++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
@@ -20,6 +20,7 @@ package org.apache.stratos.messaging.test;
 
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyApplicationFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -104,12 +105,7 @@ public class MessageFilterTest {
     @Test
     public final void testMemberFilter() {
         System.setProperty(StratosConstants.TOPOLOGY_MEMBER_FILTER,
-                TopologyMemberFilter.TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID + "=lb-cluster1,lb-cluster2|" +
-                        TopologyMemberFilter.TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID + "=np1,np2");
-
-        assertFalse(TopologyMemberFilter.apply("lb-cluster1", null));
-        assertFalse(TopologyMemberFilter.apply("lb-cluster2", null));
-        assertTrue(TopologyMemberFilter.apply("lb-cluster3", null));
+                TopologyMemberFilter.TOPOLOGY_MEMBER_FILTER_NETWORK_PARTITION_ID + "=np1,np2");
 
         assertFalse(TopologyMemberFilter.apply(null, "np1"));
         assertFalse(TopologyMemberFilter.apply(null, "np2"));
@@ -119,4 +115,15 @@ public class MessageFilterTest {
         assertFalse(TopologyMemberFilter.apply("lb-cluster2", "np2"));
         assertTrue(TopologyMemberFilter.apply("lb-cluster3", "np3"));
     }
+
+    @Test
+    public final void testApplicationFilter() {
+        System.setProperty(StratosConstants.TOPOLOGY_APPLICATION_FILTER,
+                TopologyApplicationFilter.TOPOLOGY_APPLICATION_FILTER_APPLICATION_ID +
+                        "=application-1,application-2");
+
+        assertFalse(TopologyApplicationFilter.apply("application-1"));
+        assertFalse(TopologyApplicationFilter.apply("application-2"));
+        assertTrue(TopologyApplicationFilter.apply("application-3"));
+    }
 }


Mime
View raw message