stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [06/10] removing application status and adding applications, cluster status topic
Date Fri, 31 Oct 2014 07:51:39 GMT
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
deleted file mode 100644
index 93eeb54..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-public class ApplicationStatusEventMessageListener implements MessageListener {
-    private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageListener.class);
-
-    private ApplicationStatusEventMessageQueue messageQueue;
-
-    public ApplicationStatusEventMessageListener(ApplicationStatusEventMessageQueue messageQueue)
{
-        this.messageQueue = messageQueue;
-    }
-
-    @Override
-    public void onMessage(Message message) {
-        if (message instanceof TextMessage) {
-            TextMessage receivedMessage = (TextMessage) message;
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Tenant message received: %s", ((TextMessage)
message).getText()));
-                }
-                // Add received message to the queue
-                messageQueue.add(receivedMessage);
-
-            } catch (JMSException e) {
-                log.error(e.getMessage(), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
deleted file mode 100644
index ba455c9..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-
-import javax.jms.TextMessage;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class ApplicationStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage>
{
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
deleted file mode 100644
index 0b6cada..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.util.Constants;
-
-public class ApplicationStatusEventReceiver implements Runnable {
-    private static final Log log = LogFactory.getLog(ApplicationStatusEventReceiver.class);
-
-    private ApplicationStatusEventMessageDelegator messageDelegator;
-    private ApplicationStatusEventMessageListener messageListener;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
-
-    public ApplicationStatusEventReceiver() {
-        ApplicationStatusEventMessageQueue messageQueue = new ApplicationStatusEventMessageQueue();
-        this.messageDelegator = new ApplicationStatusEventMessageDelegator(messageQueue);
-        this.messageListener = new ApplicationStatusEventMessageListener(messageQueue);
-    }
-
-    public void addEventListener(EventListener eventListener) {
-        messageDelegator.addEventListener(eventListener);
-    }
-
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new TopicSubscriber(Constants.APPLICATION_STATUS_TOPIC);
-            topicSubscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Application status event message receiver thread started");
-            }
-
-            // Start Application status event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Application status event message delegator thread started");
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Application status failed", e);
-            }
-        }
-    }
-
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
new file mode 100644
index 0000000..68d44b0
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.applications.ApplicationsMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+public class ApplicationsEventMessageDelegator implements Runnable {
+    private static final Log log = LogFactory.getLog(ApplicationsEventMessageDelegator.class);
+
+    private ApplicationsEventMessageQueue messageQueue;
+    private MessageProcessorChain processorChain;
+    private boolean terminated;
+
+    public ApplicationsEventMessageDelegator(ApplicationsEventMessageQueue messageQueue)
{
+        this.messageQueue = messageQueue;
+        this.processorChain = new ApplicationsMessageProcessorChain();
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Application status event message delegator started");
+            }
+
+            while (!terminated) {
+                try {
+                    TextMessage message = messageQueue.take();
+
+                    // Retrieve the header
+                    String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+                    // Retrieve the actual message
+                    String json = message.getText();
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Application status event message received
from queue: %s", type));
+                    }
+
+                    // Delegate message to message processor chain
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Delegating application status event message:
%s", type));
+                    }
+                    processorChain.process(type, json, null);
+                } catch (Exception e) {
+                    log.error("Failed to retrieve application status event message", e);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Application status event message delegator failed", e);
+            }
+        }
+    }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+
+
+    private EventMessage jsonToEventMessage(String json) {
+
+        EventMessage event = new EventMessage();
+        String message;
+
+        //split the message to 3 parts using ':' first is class name, second contains the
text 'message' and the third contains
+        //message
+        String[] MessageParts = json.split(":", 3);
+
+        String eventType = MessageParts[0].trim();
+        eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Extracted [event type] %s", eventType));
+        }
+
+        event.setEventName(eventType);
+        String messageTag = MessageParts[1];
+        messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\""));
+
+        if ("message".equals(messageTag)) {
+            message = MessageParts[2].trim();
+            //Remove trailing bracket twice to get the message
+            message = message.substring(0, message.lastIndexOf("}")).trim();
+            message = message.substring(0, message.lastIndexOf("}")).trim();
+            if (message.indexOf('{') == 0 && message.indexOf('}') == message.length()
- 1) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("[Extracted message] %s ", message));
+                }
+                event.setMessage(message);
+                return event;
+            }
+        }
+        return null;
+    }
+
+    private class EventMessage {
+        private String eventName;
+        private String message;
+
+        private String getEventName() {
+            return eventName;
+        }
+
+        private void setEventName(String eventName) {
+            this.eventName = eventName;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public void setMessage(String message) {
+            this.message = message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
new file mode 100644
index 0000000..936c174
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+public class ApplicationsEventMessageListener implements MessageListener {
+    private static final Log log = LogFactory.getLog(ApplicationsEventMessageListener.class);
+
+    private ApplicationsEventMessageQueue messageQueue;
+
+    public ApplicationsEventMessageListener(ApplicationsEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage receivedMessage = (TextMessage) message;
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant message received: %s", ((TextMessage)
message).getText()));
+                }
+                // Add received message to the queue
+                messageQueue.add(receivedMessage);
+
+            } catch (JMSException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
new file mode 100644
index 0000000..604513e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ApplicationsEventMessageQueue extends LinkedBlockingQueue<TextMessage>
{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
new file mode 100644
index 0000000..b7577bd
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+public class ApplicationsEventReceiver implements Runnable {
+    private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class);
+
+    private ApplicationsEventMessageDelegator messageDelegator;
+    private ApplicationsEventMessageListener messageListener;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public ApplicationsEventReceiver() {
+        ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
+        this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
+        this.messageListener = new ApplicationsEventMessageListener(messageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.APPLICATIONS_TOPIC);
+            topicSubscriber.setMessageListener(messageListener);
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Application status event message receiver thread started");
+            }
+
+            // Start Application status event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Application status event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Application status failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
new file mode 100644
index 0000000..a2fed87
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.cluster.status.ClusterStatusMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+public class ClusterStatusEventMessageDelegator implements Runnable {
+    private static final Log log = LogFactory.getLog(ClusterStatusEventMessageDelegator.class);
+
+    private ClusterStatusEventMessageQueue messageQueue;
+    private MessageProcessorChain processorChain;
+    private boolean terminated;
+
+    public ClusterStatusEventMessageDelegator(ClusterStatusEventMessageQueue messageQueue)
{
+        this.messageQueue = messageQueue;
+        this.processorChain = new ClusterStatusMessageProcessorChain();
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Application status event message delegator started");
+            }
+
+            while (!terminated) {
+                try {
+                    TextMessage message = messageQueue.take();
+
+                    // Retrieve the header
+                    String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+                    // Retrieve the actual message
+                    String json = message.getText();
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Application status event message received
from queue: %s", type));
+                    }
+
+                    // Delegate message to message processor chain
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Delegating application status event message:
%s", type));
+                    }
+                    processorChain.process(type, json, null);
+                } catch (Exception e) {
+                    log.error("Failed to retrieve application status event message", e);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Application status event message delegator failed", e);
+            }
+        }
+    }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+
+
+    private EventMessage jsonToEventMessage(String json) {
+
+        EventMessage event = new EventMessage();
+        String message;
+
+        //split the message to 3 parts using ':' first is class name, second contains the
text 'message' and the third contains
+        //message
+        String[] MessageParts = json.split(":", 3);
+
+        String eventType = MessageParts[0].trim();
+        eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Extracted [event type] %s", eventType));
+        }
+
+        event.setEventName(eventType);
+        String messageTag = MessageParts[1];
+        messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\""));
+
+        if ("message".equals(messageTag)) {
+            message = MessageParts[2].trim();
+            //Remove trailing bracket twice to get the message
+            message = message.substring(0, message.lastIndexOf("}")).trim();
+            message = message.substring(0, message.lastIndexOf("}")).trim();
+            if (message.indexOf('{') == 0 && message.indexOf('}') == message.length()
- 1) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("[Extracted message] %s ", message));
+                }
+                event.setMessage(message);
+                return event;
+            }
+        }
+        return null;
+    }
+
+    private class EventMessage {
+        private String eventName;
+        private String message;
+
+        private String getEventName() {
+            return eventName;
+        }
+
+        private void setEventName(String eventName) {
+            this.eventName = eventName;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public void setMessage(String message) {
+            this.message = message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
new file mode 100644
index 0000000..12c7800
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+public class ClusterStatusEventMessageListener implements MessageListener {
+    private static final Log log = LogFactory.getLog(ClusterStatusEventMessageListener.class);
+
+    private ClusterStatusEventMessageQueue messageQueue;
+
+    public ClusterStatusEventMessageListener(ClusterStatusEventMessageQueue messageQueue)
{
+        this.messageQueue = messageQueue;
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage receivedMessage = (TextMessage) message;
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant message received: %s", ((TextMessage)
message).getText()));
+                }
+                // Add received message to the queue
+                messageQueue.add(receivedMessage);
+
+            } catch (JMSException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
new file mode 100644
index 0000000..9656800
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ClusterStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage>
{
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
new file mode 100644
index 0000000..72ccaed
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+public class ClusterStatusEventReceiver implements Runnable {
+    private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class);
+
+    private ClusterStatusEventMessageDelegator messageDelegator;
+    private ClusterStatusEventMessageListener messageListener;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public ClusterStatusEventReceiver() {
+        ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
+        this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
+        this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.CLUSTER_STATUS_TOPIC);
+            topicSubscriber.setMessageListener(messageListener);
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Application status event message receiver thread started");
+            }
+
+            // Start Application status event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Application status event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Application status failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 33f2f22..2d2d532 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -24,7 +24,9 @@ public class Constants {
 	public static final String HEALTH_STAT_TOPIC = "summarized-health-stats";
     public static final String INSTANCE_STATUS_TOPIC = "instance-status";
     public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
-    public static final String APPLICATION_STATUS_TOPIC = "application-status";
+    public static final String APPLICATIONS_TOPIC = "applications";
+    public static final String CLUSTER_STATUS_TOPIC = "applications";
+
     public static final String PING_TOPIC = "ping";
     public static final String TENANT_TOPIC = "tenant";
     public static final String TENANT_RANGE_ALL = "*";


Mime
View raw message