stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From im...@apache.org
Subject [26/50] [abbrv] Renamed base module name to python_cartridgeagent Started decrypt password test
Date Mon, 27 Oct 2014 14:16:30 GMT
http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py b/tools/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py
new file mode 100644
index 0000000..4ceb948
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/healthstatspublisher/healthstats.py
@@ -0,0 +1,246 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from threading import Thread
+import time
+import psutil
+import os
+
+from abstracthealthstatisticspublisher import *
+from ..databridge.agent import *
+from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration
+from ..util import cartridgeagentutils, cartridgeagentconstants
+
+
+class HealthStatisticsPublisherManager(Thread):
+    """
+    Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and
+    load average and publishes them as ThriftEvents to a CEP server
+    """
+    STREAM_NAME = "cartridge_agent_health_stats"
+    STREAM_VERSION = "1.0.0"
+    STREAM_NICKNAME = "agent health stats"
+    STREAM_DESCRIPTION = "agent health stats"
+
+    def __init__(self, publish_interval):
+        """
+        Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval
+        :param int publish_interval: Number of seconds as the interval
+        :return: void
+        """
+        Thread.__init__(self)
+
+        self.log = LogFactory().get_log(__name__)
+
+        self.publish_interval = publish_interval
+        """:type : int"""
+
+        self.terminated = False
+
+        self.publisher = HealthStatisticsPublisher()
+        """:type : HealthStatisticsPublisher"""
+        # TODO: load plugins for the reader
+        self.stats_reader = DefaultHealthStatisticsReader()
+        """:type : AbstractHealthStatisticsReader"""
+
+    def run(self):
+        while not self.terminated:
+            time.sleep(self.publish_interval)
+
+            cartridge_stats = self.stats_reader.stat_cartridge_health()
+            self.log.debug("Publishing memory consumption: %r" % cartridge_stats.memory_usage)
+            self.publisher.publish_memory_usage(cartridge_stats.memory_usage)
+
+            self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg)
+            self.publisher.publish_load_average(cartridge_stats.load_avg)
+
+        self.publisher.publisher.disconnect()
+
+
+class HealthStatisticsPublisher:
+    """
+    Publishes memory usage and load average to thrift server
+    """
+    log = LogFactory().get_log(__name__)
+
+    def __init__(self):
+
+        self.ports = []
+        self.ports.append(CEPPublisherConfiguration.get_instance().server_port)
+
+        self.cartridge_agent_config = CartridgeAgentConfiguration()
+
+        cartridgeagentutils.wait_until_ports_active(
+            CEPPublisherConfiguration.get_instance().server_ip,
+            self.ports,
+            int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False)))
+        cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports)
+        if not cep_active:
+            raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.")
+
+        self.stream_definition = HealthStatisticsPublisher.create_stream_definition()
+        HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition))
+        self.publisher = ThriftPublisher(
+            CEPPublisherConfiguration.get_instance().server_ip,
+            CEPPublisherConfiguration.get_instance().server_port,
+            CEPPublisherConfiguration.get_instance().admin_username,
+            CEPPublisherConfiguration.get_instance().admin_password,
+            self.stream_definition)
+
+        HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized")
+
+    @staticmethod
+    def create_stream_definition():
+        """
+        Create a StreamDefinition for publishing to CEP
+        """
+        stream_def = StreamDefinition()
+        stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME
+        stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION
+        stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME
+        stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
+
+        stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE)
+
+        return stream_def
+
+    def publish_memory_usage(self, memory_usage):
+        """
+        Publishes the given memory usage value to the thrift server as a ThriftEvent
+        :param float memory_usage: memory usage
+        """
+
+        event = ThriftEvent()
+        event.payloadData.append(self.cartridge_agent_config.cluster_id)
+        event.payloadData.append(self.cartridge_agent_config.network_partition_id)
+        event.payloadData.append(self.cartridge_agent_config.member_id)
+        event.payloadData.append(self.cartridge_agent_config.partition_id)
+        event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION)
+        event.payloadData.append(memory_usage)
+
+        HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version))
+        self.publisher.publish(event)
+
+    def publish_load_average(self, load_avg):
+        """
+        Publishes the given load average value to the thrift server as a ThriftEvent
+        :param float load_avg: load average value
+        """
+
+        event = ThriftEvent()
+        event.payloadData.append(self.cartridge_agent_config.cluster_id)
+        event.payloadData.append(self.cartridge_agent_config.network_partition_id)
+        event.payloadData.append(self.cartridge_agent_config.member_id)
+        event.payloadData.append(self.cartridge_agent_config.partition_id)
+        event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE)
+        event.payloadData.append(load_avg)
+
+        HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version))
+        self.publisher.publish(event)
+
+
+class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader):
+    """
+    Default implementation of the AbstractHealthStatisticsReader
+    """
+
+    def __init__(self):
+        self.log = LogFactory().get_log(__name__)
+
+    def stat_cartridge_health(self):
+        cartridge_stats = CartridgeHealthStatistics()
+        cartridge_stats.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage()
+        cartridge_stats.load_avg = DefaultHealthStatisticsReader.__read_load_avg()
+
+        self.log.debug("Memory read: %r, CPU read: %r" % (cartridge_stats.memory_usage, cartridge_stats.load_avg))
+        return cartridge_stats
+
+    @staticmethod
+    def __read_mem_usage():
+        return psutil.virtual_memory().percent
+
+    @staticmethod
+    def __read_load_avg():
+        (one, five, fifteen) = os.getloadavg()
+        return one
+
+
+class CEPPublisherConfiguration:
+    """
+    TODO: Extract common functionality
+    """
+
+    __instance = None
+    log = LogFactory().get_log(__name__)
+
+    @staticmethod
+    def get_instance():
+        """
+        Singleton instance retriever
+        :return: Instance
+        :rtype : CEPPublisherConfiguration
+        """
+        if CEPPublisherConfiguration.__instance is None:
+            CEPPublisherConfiguration.__instance = CEPPublisherConfiguration()
+
+        return CEPPublisherConfiguration.__instance
+
+    def __init__(self):
+        self.enabled = False
+        self.server_ip = None
+        self.server_port = None
+        self.admin_username = None
+        self.admin_password = None
+        self.cartridge_agent_config = CartridgeAgentConfiguration()
+
+        self.read_config()
+
+    def read_config(self):
+        self.enabled = True if self.cartridge_agent_config.read_property(
+           cartridgeagentconstants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False
+        if not self.enabled:
+            CEPPublisherConfiguration.log.info("CEP Publisher disabled")
+            return
+
+        CEPPublisherConfiguration.log.info("CEP Publisher enabled")
+
+        self.server_ip = self.cartridge_agent_config.read_property(
+            cartridgeagentconstants.CEP_RECEIVER_IP, False)
+        if self.server_ip is None or self.server_ip.strip() == "":
+            raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_IP)
+
+        self.server_port = self.cartridge_agent_config.read_property(
+            cartridgeagentconstants.CEP_RECEIVER_PORT, False)
+        if self.server_port is None or self.server_port.strip() == "":
+            raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_PORT)
+
+        self.admin_username = self.cartridge_agent_config.read_property(
+            cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME, False)
+        if self.admin_username is None or self.admin_username.strip() == "":
+            raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME)
+
+        self.admin_password = self.cartridge_agent_config.read_property(
+            cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD, False)
+        if self.admin_password is None or self.admin_password.strip() == "":
+            raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD)
+
+        CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized")

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/publisher/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py b/tools/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py
new file mode 100644
index 0000000..1ce8ffb
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/publisher/cartridgeagentpublisher.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import paho.mqtt.publish as publish
+
+from .. event.instance.status.events import *
+from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
+from .. util import cartridgeagentconstants
+from .. healthstatspublisher.healthstats import *
+from .. healthstatspublisher.abstracthealthstatisticspublisher import *
+
+
+log = LogFactory().get_log(__name__)
+
+started = False
+activated = False
+ready_to_shutdown = False
+maintenance = False
+
+publishers = {}
+""" :type : dict[str, EventPublisher] """
+
+
+def publish_instance_started_event():
+    global started, log
+    if not started:
+        log.info("Publishing instance started event")
+        service_name = CartridgeAgentConfiguration().service_name
+        cluster_id = CartridgeAgentConfiguration().cluster_id
+        network_partition_id = CartridgeAgentConfiguration().network_partition_id
+        parition_id = CartridgeAgentConfiguration().partition_id
+        member_id = CartridgeAgentConfiguration().member_id
+
+        instance_started_event = InstanceStartedEvent(service_name, cluster_id, network_partition_id, parition_id,
+                                                      member_id)
+        publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_STARTED_EVENT)
+        publisher.publish(instance_started_event)
+        started = True
+        log.info("Instance started event published")
+    else:
+        log.warn("Instance already started")
+
+
+def publish_instance_activated_event():
+    global activated, log
+    if not activated:
+        log.info("Publishing instance activated event")
+        service_name = CartridgeAgentConfiguration().service_name
+        cluster_id = CartridgeAgentConfiguration().cluster_id
+        network_partition_id = CartridgeAgentConfiguration().network_partition_id
+        parition_id = CartridgeAgentConfiguration().partition_id
+        member_id = CartridgeAgentConfiguration().member_id
+
+        instance_activated_event = InstanceActivatedEvent(service_name, cluster_id, network_partition_id, parition_id,
+                                                          member_id)
+        publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_ACTIVATED_EVENT)
+        publisher.publish(instance_activated_event)
+
+        log.info("Instance activated event published")
+        log.info("Starting health statistics notifier")
+
+        if CEPPublisherConfiguration.get_instance().enabled:
+            interval_default = 15  # seconds
+            interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval", False)
+            if interval is not None and len(interval) > 0:
+                try:
+                    interval = int(interval)
+                except ValueError:
+                    interval = interval_default
+            else:
+                interval = interval_default
+
+            health_stats_publisher = HealthStatisticsPublisherManager(interval)
+            log.info("Starting Health statistics publisher with interval %r" % interval_default)
+            health_stats_publisher.start()
+        else:
+            log.warn("Statistics publisher is disabled")
+
+        activated = True
+        log.info("Health statistics notifier started")
+    else:
+        log.warn("Instance already activated")
+
+
+def publish_maintenance_mode_event():
+    global maintenance, log
+    if not maintenance:
+        log.info("Publishing instance maintenance mode event")
+
+        service_name = CartridgeAgentConfiguration().service_name
+        cluster_id = CartridgeAgentConfiguration().cluster_id
+        network_partition_id = CartridgeAgentConfiguration().network_partition_id
+        parition_id = CartridgeAgentConfiguration().partition_id
+        member_id = CartridgeAgentConfiguration().member_id
+
+        instance_maintenance_mode_event = InstanceMaintenanceModeEvent(service_name, cluster_id, network_partition_id, parition_id,
+                                                          member_id)
+
+        publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_MAINTENANCE_MODE_EVENT)
+        publisher.publish(instance_maintenance_mode_event)
+
+        maintenance = True
+        log.info("Instance Maintenance mode event published")
+    else:
+        log.warn("Instance already in a Maintenance mode....")
+
+
+def publish_instance_ready_to_shutdown_event():
+    global ready_to_shutdown, log
+    if not ready_to_shutdown:
+        log.info("Publishing instance activated event")
+
+        service_name = CartridgeAgentConfiguration().service_name
+        cluster_id = CartridgeAgentConfiguration().cluster_id
+        network_partition_id = CartridgeAgentConfiguration().network_partition_id
+        parition_id = CartridgeAgentConfiguration().partition_id
+        member_id = CartridgeAgentConfiguration().member_id
+
+        instance_shutdown_event = InstanceReadyToShutdownEvent(service_name, cluster_id, network_partition_id, parition_id,
+                                                          member_id)
+
+        publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_READY_TO_SHUTDOWN_EVENT)
+        publisher.publish(instance_shutdown_event)
+
+        ready_to_shutdown = True
+        log.info("Instance ReadyToShutDown event published")
+    else:
+        log.warn("Instance already in a ReadyToShutDown event....")
+
+
+def get_publisher(topic):
+    if topic not in publishers:
+        publishers[topic] = EventPublisher(topic)
+
+    return publishers[topic]
+
+
+class EventPublisher:
+    """
+    Handles publishing events to topics to the provided message broker
+    """
+    def __init__(self, topic):
+        self.__topic = topic
+
+    def publish(self, event):
+        mb_ip = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP)
+        mb_port = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT)
+        payload = event.to_json()
+        publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py
new file mode 100644
index 0000000..2456923
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py b/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py
new file mode 100644
index 0000000..bc026dd
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/subscriber/eventsubscriber.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+import paho.mqtt.client as mqtt
+
+
+class EventSubscriber(threading.Thread):
+    """
+    Provides functionality to subscribe to a given topic on the stratos MB and
+    register event handlers for various events.
+    """
+
+    def __init__(self, topic, ip, port):
+        threading.Thread.__init__(self)
+
+        #{"ArtifactUpdateEvent" : onArtifactUpdateEvent()}
+        self.__event_handlers = {}
+
+        self.log = LogFactory().get_log(__name__)
+
+        self.__mb_client = None
+
+        self.__topic = topic
+
+        self.__subscribed = False
+
+        self.__ip = ip
+        self.__port = port
+
+    def run(self):
+        self.__mb_client = mqtt.Client()
+        self.__mb_client.on_connect = self.on_connect
+        self.__mb_client.on_message = self.on_message
+
+        self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port))
+        self.__mb_client.connect(self.__ip, self.__port, 60)
+        self.__subscribed = True
+        self.__mb_client.loop_forever()
+
+    def register_handler(self, event, handler):
+        """
+        Adds an event handler function mapped to the provided event.
+        :param str event: Name of the event to attach the provided handler
+        :param handler: The handler function
+        :return: void
+        :rtype: void
+        """
+        self.__event_handlers[event] = handler
+        self.log.debug("Registered handler for event %r" % event)
+
+    def on_connect(self, client, userdata, flags, rc):
+        self.log.debug("Connected to message broker.")
+        self.__mb_client.subscribe(self.__topic)
+        self.log.debug("Subscribed to %r" % self.__topic)
+
+    def on_message(self, client, userdata, msg):
+        self.log.debug("Message received: %r:\n%r" % (msg.topic, msg.payload))
+
+        event = msg.topic.rpartition('/')[2]
+
+        if event in self.__event_handlers:
+            handler = self.__event_handlers[event]
+
+            try:
+                self.log.debug("Executing handler for event %r" % event)
+                handler(msg)
+            except:
+                self.log.exception("Error processing %r event" % event)
+        else:
+            self.log.debug("Event handler not found for event : %r" % event)
+
+    def is_subscribed(self):
+        """
+        Checks if this event subscriber is successfully subscribed to the provided topic
+        :return: True if subscribed, False if otherwise
+        :rtype: bool
+        """
+        return self.__subscribed
+
+
+from .. util.log import LogFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/tenant/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py b/tools/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py
new file mode 100644
index 0000000..202bd35
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/tenant/tenantcontext.py
@@ -0,0 +1,184 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class Tenant:
+    """
+    Object type representing the tenant details of a single tenant
+    """
+
+    def __init__(self, tenant_id,  tenant_domain):
+        self.tenant_id = tenant_id
+        """ :type : int """
+        self.tenant_domain = tenant_domain
+        """ :type : str """
+        self.service_name_subscription_map = {}
+        """ :type : dict[str, Subscription] """
+
+    def get_subscription(self, service_name):
+        """
+        Returns the Subscription object related to the provided service name
+        :param str service_name: service name to be retrieved
+        :return: Subscription of the service or None if the service name doesn't exist
+        :rtype: Subscription
+        """
+        if service_name in self.service_name_subscription_map:
+            return self.service_name_subscription_map[service_name]
+
+        return None
+
+    def is_subscribed(self, service_name):
+        """
+        Checks if the given service name has a subscription from this tenant
+        :param str service_name: name of the service to check
+        :return: True if the tenant is subscribed to the given service name, False if not
+        :rtype: bool
+        """
+        return service_name in self.service_name_subscription_map
+
+    def add_subscription(self, subscription):
+        """
+        Adds a subscription information entry on the subscription list for this tenant
+        :param Subscription subscription: Subscription information to be added
+        :return: void
+        :rtype: void
+        """
+        self.service_name_subscription_map[subscription.service_name] = subscription
+
+    def remove_subscription(self, service_name):
+        """
+        Removes the specified subscription details from the subscription list
+        :param str service_name: The service name of the subscription to be removed
+        :return: void
+        :rtype: void
+        """
+        if service_name in self.service_name_subscription_map:
+            self.service_name_subscription_map.pop(service_name)
+
+
+class Subscription:
+    """
+    Subscription information of a particular subscription to a service
+    """
+
+    def __init__(self, service_name, cluster_ids):
+        self.service_name = service_name
+        """ :type : str """
+        self.cluster_ids = cluster_ids
+        """ :type : list[str]  """
+        self.subscription_domain_map = {}
+        """ :type : dict[str, SubscriptionDomain]  """
+
+    def add_subscription_domain(self, domain_name, application_context):
+        """
+        Adds a subscription domain
+        :param str domain_name:
+        :param str application_context:
+        :return: void
+        :rtype: void
+        """
+        self.subscription_domain_map[domain_name] = SubscriptionDomain(domain_name, application_context)
+
+    def remove_subscription_domain(self, domain_name):
+        """
+        Removes the subscription domain of the specified domain name
+        :param str domain_name:
+        :return: void
+        :rtype: void
+        """
+        if domain_name in self.subscription_domain_map:
+            self.subscription_domain_map.pop(domain_name)
+
+    def subscription_domain_exists(self, domain_name):
+        """
+        Returns the SubscriptionDomain information of the specified domain name
+        :param str domain_name:
+        :return: SubscriptionDomain
+        :rtype: SubscriptionDomain
+        """
+        return domain_name in self.subscription_domain_map
+
+    def get_subscription_domains(self):
+        """
+        Returns the list of subscription domains of this subscription
+        :return: List of SubscriptionDomain objects
+        :rtype: list[SubscriptionDomain]
+        """
+        return self.subscription_domain_map.values()
+
+
+class SubscriptionDomain:
+    """
+    Represents a Subscription Domain
+    """
+
+    def __init__(self, domain_name, application_context):
+        self.domain_name = domain_name
+        """ :type : str  """
+        self.application_context = application_context
+        """ :type : str  """
+
+
+class TenantContext:
+    """
+    Handles and maintains a model of all the information related to tenants within this instance
+    """
+    tenants = {}
+    initialized = False
+    tenant_domains = {"carbon.super": Tenant(-1234, "carbon.super")}
+
+    @staticmethod
+    def add_tenant(tenant):
+        TenantContext.tenants[tenant.tenant_id] = tenant
+        TenantContext.tenant_domains[tenant.tenant_domain] = tenant
+
+    @staticmethod
+    def remove_tenant(tenant_id):
+        if tenant_id in TenantContext.tenants:
+            tenant = TenantContext.get_tenant(tenant_id)
+            TenantContext.tenants.pop(tenant.tenant_id)
+            TenantContext.tenant_domains.pop(tenant.tenant_domain)
+
+    @staticmethod
+    def update(tenants):
+        for tenant in tenants:
+            TenantContext.add_tenant(tenant)
+
+    @staticmethod
+    def get_tenant(tenant_id):
+        """
+        Gets the Tenant object of the provided tenant ID
+        :param int tenant_id:
+        :return: Tenant object of the provided tenant ID
+        :rtype: Tenant
+        """
+        if tenant_id in TenantContext.tenants:
+            return TenantContext.tenants[tenant_id]
+
+        return None
+
+    @staticmethod
+    def get_tenant_by_domain(tenant_domain):
+        """
+        Gets the Tenant object of the provided tenant domain
+        :param str tenant_domain:
+        :return: Tenant object of the provided tenant domain
+        :rtype: str
+        """
+        if tenant_domain in TenantContext.tenant_domains:
+            return TenantContext.tenant_domains[tenant_domain]
+
+        return None
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/topology/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/topology/topologycontext.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/topology/topologycontext.py b/tools/python_cartridgeagent/cartridgeagent/modules/topology/topologycontext.py
new file mode 100644
index 0000000..5fe2ea4
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/topology/topologycontext.py
@@ -0,0 +1,454 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from ..util import cartridgeagentconstants
+
+
+class Topology:
+    """
+    Represents the topology provided by the Cloud Controller
+    """
+
+    def __init__(self):
+        self.service_map = {}
+        """ :type : dict[str, Service]  """
+        self.initialized = False
+        """ :type : bool  """
+        self.json_str = None
+        """ :type : str  """
+
+    def get_services(self):
+        """
+        Provides the list of services on the topology
+        :return: The list of Service objects
+        :rtype: list[Service]
+        """
+        return self.service_map.values()
+
+    def get_service(self, service_name):
+        """
+        Provides the service information for the given service name
+        :param str service_name: service name to be retrieved
+        :return: Service object of the service, None if the provided service name is invalid
+        :rtype: Service
+        """
+        if service_name in self.service_map:
+            return self.service_map[service_name]
+
+        return None
+
+    def add_service(self, service):
+        """
+        Adds a service to the list of services on the topology
+
+        :param Service service:
+        :return: void
+        """
+        self.service_map[service.service_name] = service
+
+    def add_services(self, services):
+        """
+
+        :param list[Service] services:
+        :return: void
+        """
+        for service in services:
+            self.add_service(service)
+
+    def remove_service(self, service_name):
+        """
+        Removes the service of the provided service name
+        :param str service_name:
+        :return: void
+        """
+        if service_name in self.service_map:
+            self.service_map.pop(service_name)
+
+    def service_exists(self, service_name):
+        """
+        Checks if the service of the provided service name exists
+        :param str service_name:
+        :return: True if the service exists, False if otherwise
+        :rtype: bool
+        """
+        return service_name in self.service_map
+
+    def clear(self):
+        """
+        Clears the service information list
+        :return: void
+        """
+        self.service_map = {}
+
+    def __str__(self):
+        """
+        to string override
+        :return:
+        """
+        return "Topology [serviceMap= %r , initialized= %r ]" % (self.service_map, self.initialized)
+
+
+class Service:
+    """
+    Represents a service on the topology
+    """
+
+    def __init__(self, service_name, service_type):
+        self.service_name = service_name
+        """ :type : str  """
+        self.service_type = service_type
+        """ :type : str  """
+        self.cluster_id_cluster_map = {}
+        """ :type : dict[str, Cluster]  """
+        self.port_map = {}
+        """ :type : dict[str, Port]  """
+        self.properties = {}
+        """ :type : dict[str, str]  """
+
+    def get_clusters(self):
+        """
+        Provides the list of clusters in the particular service
+        :return: The list of Cluster objects
+        :rtype: list[Cluster]
+        """
+        return self.cluster_id_cluster_map.values()
+
+    def add_cluster(self, cluster):
+        """
+        Adds a cluster to the service
+        :param Cluster cluster: the cluster to be added
+        :return: void
+        """
+        self.cluster_id_cluster_map[cluster.cluster_id] = cluster
+
+    def remove_cluster(self, cluster_id):
+        if cluster_id in self.cluster_id_cluster_map:
+            self.cluster_id_cluster_map.pop(cluster_id)
+
+    def cluster_exists(self, cluster_id):
+        """
+        Checks if the cluster with the given cluster id exists for ther service
+        :param str cluster_id:
+        :return: True if the cluster for the given cluster id exists, False if otherwise
+        :rtype: bool
+        """
+        return cluster_id in self.cluster_id_cluster_map
+
+    def get_cluster(self, cluster_id):
+        """
+        Provides the Cluster information for the provided cluster id
+        :param str cluster_id: the cluster id to search for
+        :return: Cluster object for the given cluster id, None if the cluster id is invalid
+        :rtype: Cluster
+        """
+        if cluster_id in self.cluster_id_cluster_map:
+            return self.cluster_id_cluster_map[cluster_id]
+
+        return None
+
+    def get_ports(self):
+        """
+        Returns the list of ports in the particular service
+        :return: The list of Port object
+        :rtype: list[Port]
+        """
+        return self.port_map.values()
+
+    def get_port(self, proxy_port):
+        """
+        Provides the port information for the provided proxy port
+        :param str proxy_port:
+        :return: Port object for the provided port, None if port is invalid
+        :rtype: Port
+        """
+        if proxy_port in self.port_map:
+            return self.port_map[proxy_port]
+
+        return None
+
+    def add_port(self, port):
+        self.port_map[port.proxy] = port
+
+    def add_ports(self, ports):
+        for port in ports:
+            self.add_port(port)
+
+
+class Cluster:
+    """
+    Represents a cluster for a service
+    """
+
+    def __init__(self, service_name="", cluster_id="", deployment_policy_name="", autoscale_policy_name=""):
+        self.service_name = service_name
+        """ :type : str  """
+        self.cluster_id = cluster_id
+        """ :type : str  """
+        self.deployment_policy_name = deployment_policy_name
+        """ :type : str  """
+        self.autoscale_policy_name = autoscale_policy_name
+        """ :type : str  """
+        self.hostnames = []
+        """ :type : list[str]  """
+        self.member_map = {}
+        """ :type : dict[str, Member]  """
+
+        self.tenant_range = None
+        """ :type : str  """
+        self.is_lb_cluster = False
+        """ :type : bool  """
+        self.is_kubernetes_cluster = False
+        """ :type : bool  """
+        self.status = None
+        """ :type : str  """
+        self.load_balancer_algorithm_name = None
+        """ :type : str  """
+        self.properties = {}
+        """ :type : dict[str, str]  """
+        self.member_list_json = None
+        """ :type : str  """
+
+    def add_hostname(self, hostname):
+        self.hostnames.append(hostname)
+
+    def set_tenant_range(self, tenant_range):
+        self.validate_tenant_range(tenant_range)
+        self.tenant_range = tenant_range
+
+    def get_members(self):
+        """
+        Provides the list of member information in the cluster
+        :return: The list of Member object
+        :rtype: list[Member]
+        """
+        return self.member_map.values()
+
+    def add_member(self, member):
+        self.member_map[member.member_id] = member
+
+    def remove_member(self, member_id):
+        if self.member_exists(member_id):
+            self.member_map.pop(member_id)
+
+    def get_member(self, member_id):
+        """
+        Provides the member information for the provided member id
+        :param str member_id:
+        :return: Member object for the provided member id, None if member id is invalid
+        :rtype: Member
+        """
+        if self.member_exists(member_id):
+            return self.member_map[member_id]
+
+        return None
+
+    def member_exists(self, member_id):
+        """
+        Checks if the member for the provided member id exists in this cluster
+        :param str member_id: member id to be searched
+        :return: True if the member exists, False if otherwise
+        :rtype: bool
+        """
+        return member_id in self.member_map
+
+    def __str__(self):
+        return "Cluster [serviceName=" + self.service_name + ", clusterId=" + self.cluster_id \
+               + ", autoscalePolicyName=" + self.autoscale_policy_name + ", deploymentPolicyName=" \
+               + self.deployment_policy_name + ", hostNames=" + self.hostnames + ", tenantRange=" + self.tenant_range \
+               + ", isLbCluster=" + self.is_lb_cluster + ", properties=" + self.properties + "]"
+
+    def tenant_id_in_range(self, tenant_id):
+        """
+        Check whether a given tenant id is in tenant range of the cluster.
+        :param str tenant_id: tenant id to be checked
+        :return: True if the tenant id is in tenant id range, False if otherwise
+        :rtype: bool
+        """
+        if self.tenant_range is None:
+            return False
+
+        if self.tenant_range == "*":
+            return True
+        else:
+            arr = self.tenant_range.split(cartridgeagentconstants.TENANT_RANGE_DELIMITER)
+            tenant_start = int(arr[0])
+            if tenant_start <= tenant_id:
+                tenant_end = arr[1]
+                if tenant_end == "*":
+                    return True
+                else:
+                    if tenant_id <= int(tenant_end):
+                        return True
+
+        return False
+
+    def validate_tenant_range(self, tenant_range):
+        """
+        Validates the tenant range to be either '*' or a delimeted range of numbers
+        :param str tenant_range: The tenant range string to be validated
+        :return: void if the provided tenant range is valid, RuntimeError if otherwise
+        :exception: RuntimeError if the tenant range is invalid
+        """
+        valid = False
+        if tenant_range == "*":
+            valid = True
+        else:
+            arr = tenant_range.split(cartridgeagentconstants.TENANT_RANGE_DELIMITER)
+            if len(arr) == 2:
+                if arr[0].isdigit() and arr[1].isdigit():
+                    valid = True
+                elif arr[0].isdigit() and arr[1] == "*":
+                    valid = True
+
+        if not valid:
+            raise RuntimeError("Tenant range %r is not valid" % tenant_range)
+
+
+class Member:
+    """
+    Represents a member on a particular cluster
+    """
+
+    def __init__(self, service_name="", cluster_id="", network_partition_id="", parition_id="", member_id=""):
+        self.service_name = service_name
+        """ :type : str  """
+        self.cluster_id = cluster_id
+        """ :type : str  """
+        self.network_partition_id = network_partition_id
+        """ :type : str  """
+        self.partition_id = parition_id
+        """ :type : str  """
+        self.member_id = member_id
+        """ :type : str  """
+        self.port_map = {}
+        """ :type : dict[str, Port]  """
+
+        self.member_public_ip = None
+        """ :type : str  """
+        self.status = None
+        """ :type : str  """
+        self.member_ip = None
+        """ :type : str  """
+        self.properties = {}
+        """ :type : dict[str, str]  """
+        self.lb_cluster_id = None
+        """ :type : str  """
+        self.json_str = None
+        """ :type : str  """
+
+    def is_active(self):
+        """
+        Checks if the member is in active state
+        :return: True if active, False if otherwise
+        :rtype: bool
+        """
+        return self.status == MemberStatus.Activated
+
+    def get_ports(self):
+        """
+        Provides the list of the ports in the member
+        :return: List of Port objects
+        :rtype: list[Port]
+        """
+        return self.port_map.values()
+
+    def get_port(self, proxy):
+        """
+        Provides the port information for the given port id
+        :param str proxy: The port id
+        :return: Port object of the provided port id, None if otherwise
+        :rtype: Port
+        """
+        if proxy in self.port_map:
+            return self.port_map[proxy]
+
+        return None
+
+    def add_port(self, port):
+        self.port_map[port.proxy] = port
+
+    def add_ports(self, ports):
+        for port in ports:
+            self.add_port(port)
+
+
+class Port:
+    """
+    Represents a port on a particular member
+    """
+
+    def __init__(self, protocol, value, proxy):
+        self.protocol = protocol
+        """ :type : str  """
+        self.value = value
+        """ :type : str  """
+        self.proxy = proxy
+        """ :type : str  """
+
+    def __str__(self):
+        return "Port [protocol=%r, value=%r proxy=%r]" % (self.protocol, self.value, self.proxy)
+
+
+class ServiceType:
+    """
+    ServiceType enum
+    """
+    SingleTenant = 1
+    MultiTenant = 2
+
+
+class ClusterStatus:
+    """
+    ClusterStatus enum
+    """
+    Created = 1
+    In_Maintenance = 2
+    Removed = 3
+
+
+class MemberStatus:
+    """
+    MemberStatus enum
+    """
+    Created = 1
+    Starting = 2
+    Activated = 3
+    In_Maintenance = 4
+    ReadyToShutDown = 5
+    Terminated = 6
+    Suspended = 0
+    ShuttingDown = 0
+
+
+class TopologyContext:
+    """
+    Handles and maintains a model of the topology provided by the Cloud Controller
+    """
+    topology = None
+    # TODO: read write locks, Lock() and RLock()
+
+    @staticmethod
+    def get_topology():
+        #TODO: thread-safety missing
+        if TopologyContext.topology is None:
+            TopologyContext.topology = Topology()
+        return TopologyContext.topology
+
+    @staticmethod
+    def update(topology):
+        TopologyContext.topology = topology
+        TopologyContext.topology.initialized = True
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/util/__init__.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/util/__init__.py b/tools/python_cartridgeagent/cartridgeagent/modules/util/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/util/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/util/asyncscheduledtask.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/util/asyncscheduledtask.py b/tools/python_cartridgeagent/cartridgeagent/modules/util/asyncscheduledtask.py
new file mode 100644
index 0000000..4ff0416
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/util/asyncscheduledtask.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from threading import Thread
+
+class AbstractAsyncScheduledTask:
+    """
+    Exposes the contract to follow to implement a scheduled task to be executed by the ScheduledExecutor
+    """
+
+    def execute_task(self):
+        """
+        Override this method and implement the task to be executed by the ScheduledExecutor with a specified
+        interval.
+        """
+        raise NotImplementedError
+
+
+class ScheduledExecutor(Thread):
+    """
+    Executes a given task with a given interval until being terminated
+    """
+
+    def __init__(self, delay, task):
+        """
+        Creates a ScheduledExecutor thread to handle interval based repeated execution of a given task of type
+        AbstractAsyncScheduledTask
+        :param int delay: The interval to keep between executions
+        :param AbstractAsyncScheduledTask task: The task to be implemented
+        :return:
+        """
+
+        Thread.__init__(self)
+        self.delay = delay
+        """ :type : int  """
+        self.task = task
+        """ :type : AbstractAsyncScheduledTask  """
+        self.terminated = False
+        """ :type : bool  """
+
+    def run(self):
+        """
+        Start the scheduled task with a sleep time of delay in between
+        :return:
+        """
+        while not self.terminated:
+            time.sleep(self.delay)
+            task_thread = Thread(target=self.task.execute_task)
+            task_thread.start()
+
+    def terminate(self):
+        """
+        Terminate the scheduled task. Allow a maximum of 'delay' seconds to be terminated.
+        :return: void
+        """
+        self.terminated = True
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentconstants.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentconstants.py b/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentconstants.py
new file mode 100644
index 0000000..70afb30
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentconstants.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+PARAM_FILE_PATH = "param.file.path"
+EXTENSIONS_DIR = "extensions.dir"
+
+MB_IP = "mb.ip"
+MB_PORT = "mb.port"
+
+CARTRIDGE_KEY = "CARTRIDGE_KEY"
+APP_PATH = "APP_PATH"
+SERVICE_GROUP = "SERIVCE_GROUP"
+SERVICE_NAME = "SERVICE_NAME"
+CLUSTER_ID = "CLUSTER_ID"
+LB_CLUSTER_ID = "LB_CLUSTER_ID"
+NETWORK_PARTITION_ID = "NETWORK_PARTITION_ID"
+PARTITION_ID = "PARTITION_ID"
+MEMBER_ID = "MEMBER_ID"
+TENANT_ID = "TENANT_ID"
+REPO_URL = "REPO_URL"
+PORTS = "PORTS"
+DEPLOYMENT = "DEPLOYMENT"
+MANAGER_SERVICE_TYPE = "MANAGER_SERVICE_TYPE"
+WORKER_SERVICE_TYPE = "WORKER_SERVICE_TYPE"
+PERSISTENCE_MAPPING = "PERSISTENCE_MAPPING"
+
+# stratos.sh environment variables keys
+LOG_FILE_PATHS = "LOG_FILE_PATHS"
+MEMORY_CONSUMPTION = "memory_consumption"
+LOAD_AVERAGE = "load_average"
+PORTS_NOT_OPEN = "ports_not_open"
+MULTITENANT = "MULTITENANT"
+CLUSTERING = "CLUSTERING"
+MIN_INSTANCE_COUNT = "MIN_COUNT"
+ENABLE_ARTIFACT_UPDATE = "enable.artifact.update"
+ARTIFACT_UPDATE_INTERVAL = "artifact.update.interval"
+COMMIT_ENABLED = "COMMIT_ENABLED"
+AUTO_COMMIT = "auto.commit"
+AUTO_CHECKOUT = "auto.checkout"
+LISTEN_ADDRESS = "listen.address"
+PROVIDER = "PROVIDER"
+INTERNAL = "internal"
+LB_PRIVATE_IP = "lb.private.ip"
+LB_PUBLIC_IP = "lb.public.ip"
+
+# stratos.sh extension points shell scripts names keys
+INSTANCE_STARTED_SCRIPT = "extension.instance.started"
+START_SERVERS_SCRIPT = "extension.start.servers"
+INSTANCE_ACTIVATED_SCRIPT = "extension.instance.activated"
+ARTIFACTS_UPDATED_SCRIPT = "extension.artifacts.updated"
+CLEAN_UP_SCRIPT = "extension.clean"
+MOUNT_VOLUMES_SCRIPT = "extension.mount.volumes"
+MEMBER_ACTIVATED_SCRIPT = "extension.member.activated"
+MEMBER_TERMINATED_SCRIPT = "extension.member.terminated"
+MEMBER_SUSPENDED_SCRIPT = "extension.member.suspended"
+MEMBER_STARTED_SCRIPT = "extension.member.started"
+COMPLETE_TOPOLOGY_SCRIPT = "extension.complete.topology"
+COMPLETE_TENANT_SCRIPT = "extension.complete.tenant"
+SUBSCRIPTION_DOMAIN_ADDED_SCRIPT = "extension.subscription.domain.added"
+SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT = "extension.subscription.domain.removed"
+ARTIFACTS_COPY_SCRIPT = "extension.artifacts.copy"
+TENANT_SUBSCRIBED_SCRIPT = "extension.tenant.subscribed"
+TENANT_UNSUBSCRIBED_SCRIPT = "extension.tenant.unsubscribed"
+
+SERVICE_GROUP_TOPOLOGY_KEY = "payload_parameter.SERIVCE_GROUP"
+CLUSTERING_TOPOLOGY_KEY = "payload_parameter.CLUSTERING"
+CLUSTERING_PRIMARY_KEY = "PRIMARY"
+
+SUPERTENANT_TEMP_PATH = "/tmp/-1234/"
+
+DEPLOYMENT_MANAGER = "manager"
+DEPLOYMENT_WORKER = "worker"
+DEPLOYMENT_DEFAULT = "default"
+SUPER_TENANT_REPO_PATH = "super.tenant.repository.path"
+TENANT_REPO_PATH = "tenant.repository.path"
+
+# topic names to subscribe
+INSTANCE_NOTIFIER_TOPIC = "instance/#"
+HEALTH_STAT_TOPIC = "health/#"
+TOPOLOGY_TOPIC = "topology/#"
+TENANT_TOPIC = "tenant/#"
+INSTANCE_STATUS_TOPIC = "instance/status/"
+
+#Messaging Model
+TENANT_RANGE_DELIMITER = "-"
+
+INSTANCE_STARTED_EVENT = "InstanceStartedEvent"
+INSTANCE_ACTIVATED_EVENT = "InstanceActivatedEvent"
+INSTANCE_MAINTENANCE_MODE_EVENT = "InstanceMaintenanceModeEvent"
+INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent"
+
+PUBLISHER_SERVICE_NAME = "publisher"
+APISTORE_SERVICE_NAME = "apistore"
+APIMANAGER_SERVICE_NAME = "apim"
+GATEWAY_SERVICE_NAME = "gatewaymgt"
+GATEWAY_MGT_SERVICE_NAME = "gateway"
+KEY_MANAGER_SERVICE_NAME = "keymanager"
+
+PRIMARY = "PRIMARY"
+MIN_COUNT = "MIN_COUNT"
+
+#multi tenant constants
+INVALID_TENANT_ID = "-1"
+SUPER_TENANT_ID = "-1234"
+
+DATE_FORMAT = "%Y.%m.%d"
+
+PORT_CHECK_TIMEOUT = "port.check.timeout"
+
+CEP_PUBLISHER_ENABLED = "cep.stats.publisher.enabled"
+CEP_RECEIVER_IP = "thrift.receiver.ip"
+CEP_RECEIVER_PORT = "thrift.receiver.port"
+CEP_SERVER_ADMIN_USERNAME = "thrift.server.admin.username"
+CEP_SERVER_ADMIN_PASSWORD = "thrift.server.admin.password"
+
+MONITORING_PUBLISHER_ENABLED = "enable.data.publisher"
+MONITORING_RECEIVER_IP = "monitoring.server.ip"
+MONITORING_RECEIVER_PORT = "monitoring.server.port"
+MONITORING_RECEIVER_SECURE_PORT = "monitoring.server.secure.port"
+MONITORING_SERVER_ADMIN_USERNAME = "monitoring.server.admin.username"
+MONITORING_SERVER_ADMIN_PASSWORD = "monitoring.server.admin.password"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentutils.py b/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentutils.py
new file mode 100644
index 0000000..6ae89b1
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/util/cartridgeagentutils.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from Crypto.Cipher import AES
+import base64
+import os
+import time
+import socket
+import shutil
+
+#from log import LogFactory
+
+unpad = lambda s: s[0:-ord(s[-1])]
+
+#log = LogFactory().get_log(__name__)
+
+current_milli_time = lambda: int(round(time.time() * 1000))
+
+
+def decrypt_password(pass_str, secret):
+    """
+    Decrypts the given password using the given secret. The encryption is assumed to be done
+    without IV, in AES.
+    :param str pass_str: Encrypted password string in Base64 encoding
+    :param str secret: The secret string
+    :return: The decrypted password
+    :rtype: str
+    """
+
+    if pass_str is None or pass_str.strip() == "":
+        return pass_str.strip()
+
+    dec_pass = ""
+
+    try:
+        #log.debug("Decrypting password")
+        bdecoded_pass = base64.b64decode(pass_str)
+        #secret length should be 16
+        cipher = AES.new(secret, AES.MODE_ECB)
+        dec_pass = unpad(cipher.decrypt(bdecoded_pass))
+    except:
+        pass
+        #log.exception("Exception occurred while decrypting password")
+
+    #log.debug("Decrypted PWD: [%r]" % dec_pass)
+    return dec_pass
+
+
+def create_dir(path):
+    """
+    mkdir the provided path
+    :param path: The path to the directory to be made
+    :return: True if mkdir was successful, False if dir already exists
+    :rtype: bool
+    """
+    try:
+        os.mkdir(path)
+        #log.info("Successfully created directory [%r]" % path)
+        return True
+    except OSError:
+        pass
+        #log.exception("Directory creating failed in [%r]. Directory already exists. " % path)
+
+    return False
+
+
+def delete_folder_tree(path):
+    """
+    Completely deletes the provided folder
+    :param str path: Full path of the folder
+    :return: void
+    """
+    try:
+        shutil.rmtree(path)
+        #log.debug("Directory [%r] deleted." % path)
+    except OSError:
+        pass
+        #log.exception("Deletion of folder path %r failed." % path)
+
+
+def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000):
+    """
+    Blocks until the given list of ports become active
+    :param str ip_address: Ip address of the member to be checked
+    :param list[str] ports: List of ports to be checked
+    :param int ports_check_timeout: The timeout in milliseconds, defaults to 1000*60*10
+    :return: void
+    """
+    if ports_check_timeout is None:
+        ports_check_timeout = 1000 * 60 * 10
+
+    #log.debug("Port check timeout: %r" % ports_check_timeout)
+
+    active = False
+    start_time = current_milli_time()
+    while not active:
+        #log.info("Waiting for ports to be active: [ip] %r [ports] %r" % (ip_address, ports))
+        active = check_ports_active(ip_address, ports)
+        end_time = current_milli_time()
+        duration = end_time - start_time
+
+        if duration > ports_check_timeout:
+            return
+
+        time.sleep(5)
+
+    #log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports))
+
+
+def check_ports_active(ip_address, ports):
+    """
+    Checks the given list of port addresses for active state
+    :param str ip_address: Ip address of the member to be checked
+    :param list[str] ports: The list of ports to be checked
+    :return: True if the ports are active, False if at least one is not active
+    :rtype: bool
+    """
+    if len(ports) < 1:
+        raise RuntimeError("No ports found")
+
+    for port in ports:
+        s = socket.socket()
+        s.settimeout(5)
+        try:
+            s.connect((ip_address, int(port)))
+            #log.debug("Port %r is active" % port)
+            s.close()
+        except socket.error:
+            #log.debug("Print %r is not active" % port)
+            return False
+
+    return True
+
+
+def get_carbon_server_property(property_key):
+    """
+    Reads the carbon.xml file and returns the value for the property key.
+    TODO: Get carbon server xml location
+    :param str property_key: Property key to look for
+    :return: The value of the property, None if the property key is invalid or not present
+    :rtype : str
+    """
+
+    raise NotImplementedError
+
+
+def get_working_dir():
+    """
+    Returns the base directory of the cartridge agent.
+    :return: Base working dir path
+    :rtype : str
+    """
+    #"/path/to/cartridge-agent/modules/util/".split("modules") returns ["/path/to/cartridge-agent/", "/util"]
+    return os.path.abspath(os.path.dirname(__file__)).split("modules")[0]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/util/extensionutils.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/util/extensionutils.py b/tools/python_cartridgeagent/cartridgeagent/modules/util/extensionutils.py
new file mode 100644
index 0000000..6c58852
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/util/extensionutils.py
@@ -0,0 +1,494 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import subprocess
+import time
+
+from log import LogFactory
+from .. config import cartridgeagentconfiguration
+
+
+log = LogFactory().get_log(__name__)
+
+cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration()
+
+
+def execute_copy_artifact_extension(source, destination):
+    try:
+        log.debug("Executing artifacts copy extension")
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.ARTIFACTS_COPY_SCRIPT, False)
+        command = prepare_command(script_name)
+
+        output, errors = execute_command(command + " " + source + " " + destination)
+        log.debug("Artifacts copy script returned: %r" % output)
+    except:
+        log.exception("Could not execute artifacts copy extension")
+
+
+def execute_instance_started_extension(env_params):
+    try:
+        log.debug("Executing instance started extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.INSTANCE_STARTED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Instance started script returned: %r" % output)
+    except:
+        log.exception("Could not execute instance started extension")
+
+
+def execute_instance_activated_extension():
+    try:
+        log.debug("Executing instance activated extension")
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.INSTANCE_ACTIVATED_SCRIPT, False)
+        command = prepare_command(script_name)
+
+        output, errors = execute_command(command)
+        log.debug("Instance activated script returned: %r" % output)
+    except:
+        log.exception("Could not execute instance activated extension")
+
+
+def execute_artifacts_updated_extension(env_params):
+    try:
+        log.debug("Executing artifacts updated extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.ARTIFACTS_UPDATED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Artifacts updated script returned: %r" % output)
+    except:
+        log.exception("Could not execute artifacts updated extension")
+
+
+def execute_subscription_domain_added_extension(env_params):
+    try:
+        log.debug("Executing subscription domain added extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Subscription domain added script returned: %r" % output)
+    except:
+        log.exception("Could not execute subscription domain added extension")
+
+
+def execute_subscription_domain_removed_extension(env_params):
+    try:
+        log.debug("Executing subscription domain removed extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Subscription domain removed script returned: %r" % output)
+    except:
+        log.exception("Could not execute subscription domain removed extension")
+
+
+def execute_start_servers_extension(env_params):
+    try:
+        log.debug("Executing start servers extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.START_SERVERS_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Start servers script returned: %r" % output)
+    except:
+        log.exception("Could not execute start servers extension")
+
+
+def execute_complete_topology_extension(env_params):
+    try:
+        log.debug("Executing complete topology extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Complete topology script returned: %r" % output)
+    except:
+        log.exception("Could not execute complete topology extension")
+
+
+def execute_complete_tenant_extension(env_params):
+    try:
+        log.debug("Executing complete tenant extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.COMPLETE_TENANT_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Complete tenant script returned: %r" % output)
+    except:
+        log.exception("Could not execute complete tenant extension")
+
+
+def execute_tenant_subscribed_extension(env_params):
+    try:
+        log.debug("Executing tenant subscribed extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Tenant subscribed script returned: %r" % output)
+    except:
+        log.exception("Could not execute tenant subscribed extension")
+
+
+def execute_tenant_unsubscribed_extension(env_params):
+    try:
+        log.debug("Executing tenant unsubscribed extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Tenant unsubscribed script returned: %r" % output)
+    except:
+        log.exception("Could not execute tenant unsubscribed extension")
+
+
+def execute_member_terminated_extension(env_params):
+    try:
+        log.debug("Executing member terminated extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member terminated script returned: %r" % output)
+    except:
+        log.exception("Could not execute member terminated extension")
+
+
+def execute_member_suspended_extension(env_params):
+    try:
+        log.debug("Executing member suspended extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member suspended script returned: %r" % output)
+    except:
+        log.exception("Could not execute member suspended extension")
+
+
+def execute_member_started_extension(env_params):
+    try:
+        log.debug("Executing member started extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.MEMBER_STARTED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member started script returned: %r" % output)
+    except:
+        log.exception("Could not execute member started extension")
+
+
+def wait_for_complete_topology():
+    while not TopologyContext.topology.initialized:
+        log.info("Waiting for complete topology event...")
+        time.sleep(5)
+
+
+def check_topology_consistency(service_name, cluster_id, member_id):
+    topology = TopologyContext.get_topology()
+    service = topology.get_service(service_name)
+    if service is None:
+        log.error("Service not found in topology [service] %r" % service_name)
+        return False
+
+    cluster = service.get_cluster(cluster_id)
+    if cluster is None:
+        log.error("Cluster id not found in topology [cluster] %r" % cluster_id)
+        return False
+
+    activated_member = cluster.get_member(member_id)
+    if activated_member is None:
+        log.error("Member id not found in topology [member] %r" % member_id)
+        return False
+
+    return True
+
+
+def is_relevant_member_event(service_name, cluster_id, lb_cluster_id):
+    cluster_id_in_payload = cartridge_agent_config.cluster_id
+    if cluster_id_in_payload is None:
+        return False
+
+    topology = TopologyContext.get_topology()
+    if topology is None or not topology.initialized:
+        return False
+
+    if cluster_id_in_payload == cluster_id:
+        return True
+
+    if cluster_id_in_payload == lb_cluster_id:
+        return True
+
+    service_group_in_payload = cartridge_agent_config.service_group
+    if service_group_in_payload is not None:
+        service_properties = topology.get_service(service_name).properties
+        if service_properties is None:
+            return False
+
+        member_service_group = service_properties[cartridgeagentconstants.SERVICE_GROUP_TOPOLOGY_KEY]
+        if member_service_group is not None and member_service_group == service_group_in_payload:
+            if service_name == cartridge_agent_config.service_name:
+                log.debug("Service names are same")
+                return True
+            elif cartridgeagentconstants.APISTORE_SERVICE_NAME == \
+                    cartridge_agent_config.service_name \
+                    and service_name == cartridgeagentconstants.PUBLISHER_SERVICE_NAME:
+                log.debug("Service name in payload is [store]. Serivce name in event is [%r] " % service_name)
+                return True
+            elif cartridgeagentconstants.PUBLISHER_SERVICE_NAME == \
+                    cartridge_agent_config.service_name \
+                    and service_name == cartridgeagentconstants.APISTORE_SERVICE_NAME:
+                log.debug("Service name in payload is [publisher]. Serivce name in event is [%r] " % service_name)
+                return True
+            elif cartridgeagentconstants.DEPLOYMENT_WORKER == \
+                    cartridge_agent_config.deployment \
+                    and service_name == cartridge_agent_config.manager_service_name:
+                log.debug("Deployment is worker. Worker's manager service name & service name in event are same")
+                return True
+            elif cartridgeagentconstants.DEPLOYMENT_MANAGER == \
+                    cartridge_agent_config.deployment  \
+                    and service_name == cartridge_agent_config.worker_service_name:
+                log.debug("Deployment is manager. Manager's worker service name & service name in event are same")
+                return True
+
+    return False
+
+
+def execute_volume_mount_extension(persistance_mappings_payload):
+    try:
+        log.debug("Executing volume mounting extension: [payload] %r" % persistance_mappings_payload)
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.MOUNT_VOLUMES_SCRIPT, False)
+        command = prepare_command(script_name)
+
+        output, errors = execute_command(command + " " + persistance_mappings_payload)
+        log.debug("Volume mount script returned: %r" % output)
+    except:
+        log.exception("Could not execute Volume mount extension")
+
+
+def execute_cleanup_extension():
+    try:
+        log.debug("Executing cleanup extension")
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.CLEAN_UP_SCRIPT, False)
+        command = prepare_command(script_name)
+
+        output, errors = execute_command(command)
+        log.debug("Cleanup script returned: %r" % output)
+    except:
+        log.exception("Could not execute Cleanup extension")
+
+
+def execute_member_activated_extension(env_params):
+    try:
+        log.debug("Executing member activated extension")
+
+        script_name = cartridge_agent_config.read_property(
+            cartridgeagentconstants.MEMBER_ACTIVATED_SCRIPT, False)
+        command = prepare_command(script_name)
+        env_params = add_payload_parameters(env_params)
+        env_params = clean_process_parameters(env_params)
+
+        output, errors = execute_command(command, env_params)
+        log.debug("Member activated script returned: %r" % output)
+    except:
+        log.exception("Could not execute member activated extension")
+
+
+def prepare_command(script_name):
+    extensions_dir = cartridge_agent_config.read_property(
+        cartridgeagentconstants.EXTENSIONS_DIR, False)
+    if extensions_dir.strip() == "":
+        raise RuntimeError("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR)
+
+    file_path = extensions_dir + script_name if str(extensions_dir).endswith("/") \
+        else extensions_dir + "/" + script_name
+
+    if os.path.isfile(file_path):
+        return file_path
+
+    raise IOError("Script file not found : %r" % file_path)
+
+
+def clean_process_parameters(params):
+    """
+    Removes any null valued parameters before passing them to the extension scripts
+    :param dict params:
+    :return: cleaned parameters
+    :rtype: dict
+    """
+    for key, value in params.items():
+        if value is None:
+            del params[key]
+
+    return params
+
+
+def add_payload_parameters(env_params):
+    """
+    Adds the common parameters to be used by the extension scripts
+    :param dict[str, str] env_params: Dictionary to be added
+    :return: Dictionary with updated parameters
+    :rtype: dict[str, str]
+    """
+    env_params["STRATOS_APP_PATH"] = cartridge_agent_config.app_path
+    env_params["STRATOS_PARAM_FILE_PATH"] = cartridge_agent_config.read_property(
+        cartridgeagentconstants.PARAM_FILE_PATH, False)
+    env_params["STRATOS_SERVICE_NAME"] = cartridge_agent_config.service_name
+    env_params["STRATOS_TENANT_ID"] = cartridge_agent_config.tenant_id
+    env_params["STRATOS_CARTRIDGE_KEY"] = cartridge_agent_config.cartridge_key
+    env_params["STRATOS_LB_CLUSTER_ID"] = cartridge_agent_config.lb_cluster_id
+    env_params["STRATOS_CLUSTER_ID"] = cartridge_agent_config.cluster_id
+    env_params["STRATOS_NETWORK_PARTITION_ID"] = \
+        cartridge_agent_config.network_partition_id
+    env_params["STRATOS_PARTITION_ID"] = cartridge_agent_config.partition_id
+    env_params["STRATOS_PERSISTENCE_MAPPINGS"] = \
+        cartridge_agent_config.persistence_mappings
+    env_params["STRATOS_REPO_URL"] = cartridge_agent_config.repo_url
+
+    lb_cluster_id_in_payload = cartridge_agent_config.lb_cluster_id
+    member_ips = get_lb_member_ip(lb_cluster_id_in_payload)
+    if member_ips is not None:
+        env_params["STRATOS_LB_IP"] = member_ips[0]
+        env_params["STRATOS_LB_PUBLIC_IP"] = member_ips[1]
+    else:
+        env_params["STRATOS_LB_IP"] = cartridge_agent_config.lb_private_ip
+        env_params["STRATOS_LB_PUBLIC_IP"] = cartridge_agent_config.lb_public_ip
+
+    topology = TopologyContext.get_topology()
+    if topology.initialized:
+        service = topology.get_service(cartridge_agent_config.service_name)
+        cluster = service.get_cluster(cartridge_agent_config.cluster_id)
+        member_id_in_payload = cartridge_agent_config.member_id
+        add_properties(service.properties, env_params, "SERVICE_PROPERTY")
+        add_properties(cluster.properties, env_params, "CLUSTER_PROPERTY")
+        add_properties(cluster.get_member(member_id_in_payload).properties, env_params, "MEMBER_PROPERTY")
+
+    return env_params
+
+
+def add_properties(properties, params, prefix):
+    """
+    Adds the given property list to the parameters list with given prefix in the parameter name
+    :param dict[str, str] properties: service properties
+    :param dict[str, str] params:
+    :param str prefix:
+    :return: dict[str, str]
+    """
+    if properties is None or properties.items() is None:
+        return
+
+    for key in properties:
+        params["STRATOS_" + prefix + "_" + key] = str(properties[key])
+        log.debug("Property added: [key] STRATOS_ " + prefix + "_" + key + "[value] " + properties[key])
+
+
+def get_lb_member_ip(lb_cluster_id):
+    topology = TopologyContext.get_topology()
+    services = topology.get_services()
+
+    for service in services:
+        clusters = service.get_clusters()
+        for cluster in clusters:
+            members = cluster.get_members()
+            for member in members:
+                if member.cluster_id == lb_cluster_id:
+                    return [member.member_ip, member.member_public_ip]
+
+    return None
+
+
+def execute_command(command, env_params=None):
+    """
+    Executes the given command string with given environment parameters
+    :param str command: Command with arguments to be executed
+    :param dict[str, str] env_params: Environment variables to be used
+    :return: output and error string tuple, RuntimeError if errors occur
+    :rtype: tuple
+    :exception: RuntimeError
+    """
+    os_env = os.environ.copy()
+    if env_params is not None:
+        os_env.update(env_params)
+
+    p = subprocess.Popen([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os_env)
+    output, errors = p.communicate()
+    log.debug("output = %r" % output)
+    log.debug("error = %r" % errors)
+    if len(errors) > 0:
+        raise RuntimeError("Command execution failed: \n %r" % errors)
+
+    return output, errors
+
+
+from .. topology.topologycontext import *
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/a11df3ed/tools/python_cartridgeagent/cartridgeagent/modules/util/log.py
----------------------------------------------------------------------
diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/util/log.py b/tools/python_cartridgeagent/cartridgeagent/modules/util/log.py
new file mode 100644
index 0000000..9bad214
--- /dev/null
+++ b/tools/python_cartridgeagent/cartridgeagent/modules/util/log.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import logging.config
+import os
+
+
+class LogFactory(object):
+    """
+    Singleton implementation for handling logging in CartridgeAgent
+    """
+    class __LogFactory:
+        def __init__(self):
+            self.logs = {}
+            logging_conf = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "logging.ini"
+            logging.config.fileConfig(logging_conf)
+
+        def get_log(self, name):
+            if name not in self.logs:
+                self.logs[name] = logging.getLogger(name)
+
+            return self.logs[name]
+
+    instance = None
+
+    def __new__(cls, *args, **kwargs):
+        if not LogFactory.instance:
+            LogFactory.instance = LogFactory.__LogFactory()
+
+        return LogFactory.instance
+
+    def get_log(self, name):
+        """
+        Returns a logger class with the specified channel name. Creates a new logger if one doesn't exists for the
+        specified channel
+        :param str name: Channel name
+        :return: The logger class
+        :rtype: RootLogger
+        """
+        return self.instance.get_log(name)
\ No newline at end of file


Mime
View raw message