stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ud...@apache.org
Subject [2/4] git commit: adding message processors for topology
Date Wed, 22 Oct 2014 08:26:48 GMT
adding message processors for topology


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1763feac
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1763feac
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1763feac

Branch: refs/heads/4.0.0-grouping
Commit: 1763feac9dbfd7877f288af423661417c5c3fca9
Parents: ad31e2a
Author: Udara Liyanage <udara@wso2.com>
Authored: Wed Oct 22 13:50:19 2014 +0530
Committer: Udara Liyanage <udara@wso2.com>
Committed: Wed Oct 22 13:50:19 2014 +0530

----------------------------------------------------------------------
 .../ApplicationInactivatedMessageProcessor.java | 103 +++++++++++++++++++
 .../ApplicationTerminatedMessageProcessor.java  | 103 +++++++++++++++++++
 .../topology/TopologyMessageProcessorChain.java |  25 ++++-
 .../ApplicationTerminatingMessageProcessor.java | 102 ++++++++++++++++++
 4 files changed, 332 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
new file mode 100644
index 0000000..986e04e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationInactivatedMessageProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the
Topology.
+ */
+public class ApplicationInactivatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationInactivatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
+                    jsonToObject(message, ApplicationInactivatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using
available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        } else {
+            // Apply changes to the topology
+            application.setStatus(ApplicationStatus.Terminated);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application updated as inactivated : %s",
+                        application.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
new file mode 100644
index 0000000..3c9d753
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ApplicationTerminatedMessageProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the
Topology.
+ */
+public class ApplicationTerminatedMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationTerminatedMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
+                    jsonToObject(message, ApplicationInactivatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using
available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        } else {
+            // Apply changes to the topology
+            application.setStatus(ApplicationStatus.Terminating);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application updated as terminating : %s",
+                        application.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index db9e8b1..b0cefbb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -22,8 +22,12 @@ package org.apache.stratos.messaging.message.processor.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.listener.application.status.ApplicationInActivatedEventListener;
+import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatedEventListener;
+import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatingEventListener;
 import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.topology.updater.ApplicationTerminatingMessageProcessor;
 
 /**
  * Defines default topology message processor chain.
@@ -51,6 +55,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain
{
     private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
     private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor;
     private ApplicationActivatedMessageProcessor applicationActivatedMessageProcessor;
+    private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor;
+    private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor;
+    private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor;
 
     public void initialize() {
         // Add topology event processors
@@ -108,6 +115,15 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain
{
         applicationActivatedMessageProcessor = new ApplicationActivatedMessageProcessor();
         add(applicationActivatedMessageProcessor);
 
+        applicationInactivatedMessageProcessor = new ApplicationInactivatedMessageProcessor();
+        add(applicationInactivatedMessageProcessor);
+
+        applicationTerminatedMessageProcessor = new ApplicationTerminatedMessageProcessor();
+        add(applicationTerminatedMessageProcessor);
+
+        applicationTerminatingMessageProcessor = new ApplicationTerminatingMessageProcessor();
+        add(applicationTerminatingMessageProcessor);
+
         if (log.isDebugEnabled()) {
             log.debug("Topology message processor chain initialized X1");
         }
@@ -150,7 +166,14 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain
{
             applicationRemovedMessageProcessor.addEventListener(eventListener);
         } else if (eventListener instanceof ApplicationActivatedEventListener) {
             applicationActivatedMessageProcessor.addEventListener(eventListener);
-        } else {
+        } else  if (eventListener instanceof ApplicationInActivatedEventListener){
+            applicationInactivatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof ApplicationTerminatedEventListener){
+            applicationTerminatedMessageProcessor.addEventListener(eventListener);
+        } else if(eventListener instanceof ApplicationTerminatingEventListener){
+            applicationTerminatingMessageProcessor.addEventListener(eventListener);
+        }
+        else {
             throw new RuntimeException("Unknown event listener");
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/1763feac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java
new file mode 100644
index 0000000..fba6b53
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/updater/ApplicationTerminatingMessageProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology.updater;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ApplicationInactivatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor responsible to process the application Inactivation even and update the
Topology.
+ */
+public class ApplicationTerminatingMessageProcessor extends MessageProcessor {
+    private static final Log log =
+            LogFactory.getLog(ApplicationTerminatingMessageProcessor.class);
+
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        Topology topology = (Topology) object;
+
+        if (ApplicationInactivatedEvent.class.getName().equals(type)) {
+            // Return if topology has not been initialized
+            if (!topology.isInitialized())
+                return false;
+
+            // Parse complete message and build event
+            ApplicationInactivatedEvent event = (ApplicationInactivatedEvent) Util.
+                    jsonToObject(message, ApplicationInactivatedEvent.class);
+
+            TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+            try {
+                return doProcess(event, topology);
+
+            } finally {
+                TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+            }
+
+        } else {
+            if (nextProcessor != null) {
+                // ask the next processor to take care of the message.
+                return nextProcessor.process(type, message, topology);
+            } else {
+                throw new RuntimeException(String.format("Failed to process message using
available message processors: [type] %s [body] %s", type, message));
+            }
+        }
+    }
+
+    private boolean doProcess (ApplicationInactivatedEvent event, Topology topology) {
+
+        // Validate event against the existing topology
+        Application application = topology.getApplication(event.getAppId());
+        if (application == null) {
+            if (log.isWarnEnabled()) {
+                log.warn(String.format("Application does not exist: [service] %s",
+                        event.getAppId()));
+            }
+            return false;
+        } else {
+            // Apply changes to the topology
+            application.setStatus(ApplicationStatus.Inactive);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application updated as activated : %s",
+                        application.toString()));
+            }
+        }
+
+        // Notify event listeners
+        notifyEventListeners(event);
+        return true;
+
+    }
+}


Mime
View raw message