stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saj...@apache.org
Subject [5/7] code review changes to cluster monitors
Date Mon, 06 Oct 2014 17:43:11 GMT
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1603aef..e857eaf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,29 +19,16 @@
 
 package org.apache.stratos.autoscaler.message.receiver.topology;
 
-import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
@@ -112,7 +99,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
                     TopologyManager.acquireReadLock();
                     for (Service service : TopologyManager.getTopology().getServices()) {
@@ -121,167 +107,108 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                         }
                     }
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
-
         });
 
         topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
+                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+                    String clusterId = memberReadyToShutdownEvent.getClusterId();
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
                     AbstractClusterMonitor monitor;
-                    String clusterId = memberReadyToShutdownEvent.getClusterId();
-                    String memberId = memberReadyToShutdownEvent.getMemberId();
-
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    TopologyManager.acquireReadLock();
-                    
-                    if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                    		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                    	
-                        NetworkPartitionContext nwPartitionCtxt;
-                        String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
-						nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                        // start a new member in the same Partition
-                        String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
-                        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                        // terminate the shutdown ready member
-                        CloudControllerClient ccClient = CloudControllerClient.getInstance();
-                        ccClient.terminate(memberId);
-
-                        // remove from active member list
-                        partitionCtxt.removeActiveMemberById(memberId);
-                        
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member is terminated and removed from the active members list: "
-                            		+ "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-                        }
-                    } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                    	// no need to do anything
-                    }
+                    monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
+        });
 
-                } catch (TerminationException e) {
-                    log.error(e);
+        topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    log.info("Event received: " + event);
+                    ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
+                    Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId());
+                    startClusterMonitor(cluster);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
         });
 
-        topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
-                    @Override
-                    protected void onEvent(Event event) {
-                        try {
-                            log.info("Event received: " + event);
-                            ClusterCreatedEvent e = (ClusterCreatedEvent) event;
-                            TopologyManager.acquireReadLock();
-                            Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                            Cluster cluster = service.getCluster(e.getClusterId());
-                            startClusterMonitor(cluster);
-                        } catch (Exception e) {
-                            log.error("Error processing event", e);
-                        } finally {
-                            TopologyManager.releaseReadLock();
-                        }
-                    }
-
-                });
-
         topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
                     log.info("Event received: " + event);
-                    ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event;
+                    ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = (ClusterMaintenanceModeEvent) event;
                     TopologyManager.acquireReadLock();
-                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                    Cluster cluster = service.getCluster(e.getClusterId());
-                    if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
-                    	AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
-                    } else {
+                    Service service = TopologyManager.getTopology().getService(clusterMaintenanceModeEvent.getServiceName());
+                    Cluster cluster = service.getCluster(clusterMaintenanceModeEvent.getClusterId());
+                    AbstractClusterMonitor monitor;
+                    monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+                    if (null == monitor) {
                         log.error("cluster monitor not exists for the cluster: " + cluster.toString());
+                        return;
                     }
+                    monitor.setStatus(clusterMaintenanceModeEvent.getStatus());
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
-                });
+        });
 
         topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-                    TopologyManager.acquireReadLock();
-
-                    String clusterId = e.getClusterId();
-                    String deploymentPolicy = e.getDeploymentPolicy();
-
-                    AbstractClusterMonitor monitor = null;
-
-                    if (e.isLbCluster()) {
-                        DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
-                        if (depPolicy != null) {
-                            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
-                                    .getNetworkPartitionLbHolders(depPolicy);
-
-                            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
-                                // removes lb cluster ids
-                                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
-                                if (isRemoved) {
-                                    log.info("Removed the lb cluster [id]:"
-                                            + clusterId
-                                            + " reference from Network Partition [id]: "
-                                            + networkPartitionLbHolder
-                                            .getNetworkPartitionId());
-
-                                }
-                                if (log.isDebugEnabled()) {
-                                    log.debug(networkPartitionLbHolder);
-                                }
-
-                            }
+                    ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+                    String clusterId = clusterRemovedEvent.getClusterId();
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    AbstractClusterMonitor monitor;
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                    + "[cluster] %s", clusterId));
                         }
+                        return;
                     }
-                    
-                    monitor = AutoscalerContext.getInstance().removeClusterMonitor(clusterId);                               
-
-                    // runTerminateAllRule(monitor);
-                    if (monitor != null) {
-                        monitor.destroy();
-                        log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
-                                clusterId));
-                    }
+                    monitor.handleClusterRemovedEvent(clusterRemovedEvent);
+                    asCtx.removeClusterMonitor(clusterId);
+                    monitor.destroy();
+                    log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+                                           clusterId));
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
-
         });
 
         topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
@@ -295,70 +222,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
-                    String memberId = e.getMemberId();
+                    MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+                    String clusterId = memberTerminatedEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                    		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                    	
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                        PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
-                        partitionContext.removeMemberStatsContext(memberId);
-
-                        if (partitionContext.removeTerminationPendingMember(memberId)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is removed from termination pending members list: "
-                                		+ "[member] %s", memberId));
-                            }
-                        } else if (partitionContext.removePendingMember(memberId)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is removed from pending members list: "
-                                		+ "[member] %s", memberId));
-                            }
-                        } else if (partitionContext.removeActiveMemberById(memberId)) {
-                            log.warn(String.format("Member is in the wrong list and it is removed from "
-                            		+ "active members list", memberId));
-                        } else if (partitionContext.removeObsoleteMember(memberId)){
-                        	log.warn(String.format("Member's obsolated timeout has been expired and "
-                        			+ "it is removed from obsolated members list", memberId));
-                        } else {
-                            log.warn(String.format("Member is not available in any of the list active, "
-                            		+ "pending and termination pending", memberId));
-                        }
-
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been removed successfully: "
-                            		+ "[member] %s", memberId));
-                        }
-                    } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                    	// no need to do anything
-                    }
-                    
+                    monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
 
@@ -367,160 +247,47 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-
-                    MemberActivatedEvent e = (MemberActivatedEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String clusterId = e.getClusterId();
+                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+                    String clusterId = memberActivatedEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    if (monitor.getClusterType() == ClusterType.VMServiceCluster 
-                    		|| monitor.getClusterType() == ClusterType.VMLbCluster) {    
-                    	PartitionContext partitionContext;
-                        partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been added successfully: "
-                            		+ "[member] %s", memberId));
-                        }
-                        partitionContext.movePendingMemberToActiveMembers(memberId);
-					} else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-						KubernetesClusterContext kubernetesClusterContext;
-						kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
-						kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been added successfully: "
-                            		+ "[member] %s", memberId));
-                        }
-						kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
-					}
-                    
+                    monitor.handleMemberActivatedEvent(memberActivatedEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
         });
 
-        topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
-           @Override
-           protected void onEvent(Event event) {
-               try {
-            	   TopologyManager.acquireReadLock();
-            	   
-                   MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
-                   AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                   AbstractClusterMonitor monitor;
-                   String clusterId = memberReadyToShutdownEvent.getClusterId();
-                   String memberId = memberReadyToShutdownEvent.getMemberId();
-
-                   if(asCtx.clusterMonitorExist(clusterId)) {
-                       monitor = asCtx.getClusterMonitor(clusterId);
-                   } else {
-                       if(log.isDebugEnabled()){
-                           log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                           		+ "[cluster] %s", clusterId));
-                       }
-                       return;
-                   }
-
-                   if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		   || monitor.getClusterType() == ClusterType.VMLbCluster) {
-                	   
-                       NetworkPartitionContext nwPartitionCtxt;
-                       String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
-                       nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                       // start a new member in the same Partition
-                       String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
-                       PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                       // terminate the shutdown ready member
-                       CloudControllerClient ccClient = CloudControllerClient.getInstance();
-                       ccClient.terminate(memberId);
-
-                       // remove from active member list
-                       partitionCtxt.removeActiveMemberById(memberId);
-
-                       if (log.isInfoEnabled()) {
-                           log.info(String.format("Member is terminated and removed from the active members list: "
-                           		+ "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-                       }
-                   } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	   // no need to do anything
-                   }
-
-               } catch (TerminationException e) {
-                   log.error(e);
-               }
-           }
-
-       });
-
-
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-
-                    MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String clusterId = e.getClusterId();
+                    MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+                    String clusterId = maintenanceModeEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    if (asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                 		   || monitor.getClusterType() == ClusterType.VMLbCluster) {
-                    	
-                    	PartitionContext partitionContext;
-                    	partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member has been moved as pending termination: "
-                            		+ "[member] %s", memberId));
-                        }
-                        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
-                    } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                    	// no need to do anything
-                    }
-
+                    monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
         });
@@ -529,27 +296,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
-//                try {
-//                    TopologyManager.acquireReadLock();
-//
-//                    // Remove all clusters of given service from context
-//                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
-//                    for(Service service : TopologyManager.getTopology().getServices()) {
-//                        for(Cluster cluster : service.getClusters()) {
-//                            removeMonitor(cluster.getHostName());
-//                        }
-//                    }
-//                }
-//                finally {
-//                    TopologyManager.releaseReadLock();
-//                }
+
             }
         });
     }
 
     private class ClusterMonitorAdder implements Runnable {
         private Cluster cluster;
-        private String clusterMonitorType;
+
         public ClusterMonitorAdder(Cluster cluster) {
             this.cluster = cluster;
         }
@@ -567,38 +321,41 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 try {
                     monitor = ClusterMonitorFactory.getMonitor(cluster);
                     success = true;
-                    clusterMonitorType = monitor.getClusterType().name();
                 } catch (PolicyValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
-                    log.debug(msg, e);
+                    if (log.isDebugEnabled()) {
+                        String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                        log.debug(msg, e);
+                    }
                     retries--;
-
                 } catch (PartitionValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
-                    log.debug(msg, e);
+                    if (log.isDebugEnabled()) {
+                        String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                        log.debug(msg, e);
+                    }
                     retries--;
                 }
             } while (!success && retries != 0);
 
             if (monitor == null) {
                 String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
-                        + "for cluster: " + cluster.getClusterId();
+                             + "for cluster: " + cluster.getClusterId();
                 log.error(msg);
                 throw new RuntimeException(msg);
             }
-
+            //TODO  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+            //		scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS);
             Thread th = new Thread(monitor);
             th.start();
             AutoscalerContext.getInstance().addClusterMonitor(monitor);
             if (log.isInfoEnabled()) {
-                log.info(String.format("%s monitor has been added successfully: [cluster] %s",
-                        clusterMonitorType, cluster.getClusterId()));
+                log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
+                                       cluster.getClusterId()));
             }
         }
     }
- 
+
     @SuppressWarnings("unused")
-	private void runTerminateAllRule(VMClusterMonitor monitor) {
+    private void runTerminateAllRule(VMClusterMonitor monitor) {
 
         FactHandle terminateAllFactHandle = null;
 
@@ -621,9 +378,13 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
 
     protected synchronized void startClusterMonitor(Cluster cluster) {
         Thread th = null;
-        if (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
-        	th = new Thread(new ClusterMonitorAdder(cluster));
-        } 
+
+        AbstractClusterMonitor monitor;
+        monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+
+        if (null == monitor) {
+            th = new Thread(new ClusterMonitorAdder(cluster));
+        }
         if (th != null) {
             th.start();
             try {
@@ -632,9 +393,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             }
 
             if (log.isDebugEnabled()) {
-                log.debug(String
-                        .format("Cluster monitor thread has been started successfully: [cluster] %s ",
-                                cluster.getClusterId()));
+                log.debug(String.format("Cluster monitor thread has been started successfully: "
+                                        + "[cluster] %s ", cluster.getClusterId()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index cb60027..6061c3b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -19,130 +19,211 @@
 package org.apache.stratos.autoscaler.monitor;
 
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
 
 /*
  * Every cluster monitor, which are monitoring a cluster, should extend this class.
  */
-public abstract class AbstractClusterMonitor implements Runnable{
-	
+public abstract class AbstractClusterMonitor implements Runnable {
+
     private String clusterId;
     private String serviceId;
-    private ClusterType clusterType;
-	private ClusterStatus status;
-	private int monitorInterval;
-	
-	protected FactHandle minCheckFactHandle;
-	protected FactHandle scaleCheckFactHandle;
-	private StatefulKnowledgeSession minCheckKnowledgeSession;
-	private StatefulKnowledgeSession scaleCheckKnowledgeSession;
-	private boolean isDestroyed;
-	
-	private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-	
-	protected AbstractClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, 
-			AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-		
-		super();
-		this.clusterId = clusterId;
-		this.serviceId = serviceId;
-		this.clusterType = clusterType;
-		this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    private ClusterStatus status;
+    private int monitoringIntervalMilliseconds;
+
+    protected FactHandle minCheckFactHandle;
+    protected FactHandle scaleCheckFactHandle;
+    private StatefulKnowledgeSession minCheckKnowledgeSession;
+    private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+    private boolean isDestroyed;
+
+    private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+
+    protected AbstractClusterMonitor(String clusterId, String serviceId,
+                                     AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+        super();
+        this.clusterId = clusterId;
+        this.serviceId = serviceId;
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
         this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
         this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
-	}
+    }
+
+    protected abstract void readConfigurations();
+
+    protected abstract void monitor();
 
-	protected abstract void readConfigurations();
-	protected abstract void monitor();
     public abstract void destroy();
-    
-	public String getClusterId() {
-		return clusterId;
-	}
-	
-	public void setClusterId(String clusterId) {
-		this.clusterId = clusterId;
-	}
-	
-	public void setStatus(ClusterStatus status) {
-		this.status = status;
-	}
-
-	public ClusterType getClusterType() {
-		return clusterType;
-	}
-
-	public ClusterStatus getStatus() {
-		return status;
-	}
-	
-	public String getServiceId() {
-		return serviceId;
-	}
-	
-	public void setServiceId(String serviceId) {
-		this.serviceId = serviceId;
-	}
-	
-	public int getMonitorInterval() {
-		return monitorInterval;
-	}
-	
-	public void setMonitorInterval(int monitorInterval) {
-		this.monitorInterval = monitorInterval;
-	}
-
-	public FactHandle getMinCheckFactHandle() {
-		return minCheckFactHandle;
-	}
-	
-	public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
-		this.minCheckFactHandle = minCheckFactHandle;
-	}
-	
-	public FactHandle getScaleCheckFactHandle() {
-		return scaleCheckFactHandle;
-	}
-	
-	public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
-		this.scaleCheckFactHandle = scaleCheckFactHandle;
-	}
-	
-	public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
-		return minCheckKnowledgeSession;
-	}
-	
-	public void setMinCheckKnowledgeSession(
-			StatefulKnowledgeSession minCheckKnowledgeSession) {
-		this.minCheckKnowledgeSession = minCheckKnowledgeSession;
-	}
-	
-	public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
-		return scaleCheckKnowledgeSession;
-	}
-	
-	public void setScaleCheckKnowledgeSession(
-			StatefulKnowledgeSession scaleCheckKnowledgeSession) {
-		this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
-	}
-	
-	public boolean isDestroyed() {
-		return isDestroyed;
-	}
-	
-	public void setDestroyed(boolean isDestroyed) {
-		this.isDestroyed = isDestroyed;
-	}
-
-	public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
-		return autoscalerRuleEvaluator;
-	}
-
-	public void setAutoscalerRuleEvaluator(
-			AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-		this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
-	}
+
+    //handle health events
+    public abstract void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent);
+
+    public abstract void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+    public abstract void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+    public abstract void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+    public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+    public abstract void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+    public abstract void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+    public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+    public abstract void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+    public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+    public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+    public abstract void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+    public abstract void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+    public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+    //handle topology events
+    public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+    public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+    public abstract void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent);
+
+    public abstract void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+    public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+    public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void setStatus(ClusterStatus status) {
+        this.status = status;
+    }
+
+    public ClusterStatus getStatus() {
+        return status;
+    }
+
+    public String getServiceId() {
+        return serviceId;
+    }
+
+    public void setServiceId(String serviceId) {
+        this.serviceId = serviceId;
+    }
+
+    public int getMonitorIntervalMilliseconds() {
+        return monitoringIntervalMilliseconds;
+    }
+
+    public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+        this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+    }
+
+    public FactHandle getMinCheckFactHandle() {
+        return minCheckFactHandle;
+    }
+
+    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+        this.minCheckFactHandle = minCheckFactHandle;
+    }
+
+    public FactHandle getScaleCheckFactHandle() {
+        return scaleCheckFactHandle;
+    }
+
+    public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+        this.scaleCheckFactHandle = scaleCheckFactHandle;
+    }
+
+    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+        return minCheckKnowledgeSession;
+    }
+
+    public void setMinCheckKnowledgeSession(
+            StatefulKnowledgeSession minCheckKnowledgeSession) {
+        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+    }
+
+    public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+        return scaleCheckKnowledgeSession;
+    }
+
+    public void setScaleCheckKnowledgeSession(
+            StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+        this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+    }
+
+    public boolean isDestroyed() {
+        return isDestroyed;
+    }
+
+    public void setDestroyed(boolean isDestroyed) {
+        this.isDestroyed = isDestroyed;
+    }
+
+    public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+        return autoscalerRuleEvaluator;
+    }
+
+    public void setAutoscalerRuleEvaluator(
+            AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
index bd01dc6..208e4ce 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -52,30 +52,32 @@ import org.apache.stratos.messaging.util.Constants;
  * Factory class for creating cluster monitors.
  */
 public class ClusterMonitorFactory {
-	
-	private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
-	/**
-	 * @param cluster the cluster to be monitored
-	 * @return the created cluster monitor
-	 * @throws PolicyValidationException when deployment policy is not valid
-	 * @throws PartitionValidationException when partition is not valid
-	 */
-	public static AbstractClusterMonitor getMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-		
-		AbstractClusterMonitor clusterMonitor;
-		if(cluster.isKubernetesCluster()){
-			clusterMonitor = getDockerServiceClusterMonitor(cluster);
-		} else if (cluster.isLbCluster()){
-			clusterMonitor = getVMLbClusterMonitor(cluster);
-		} else {
-			clusterMonitor = getVMServiceClusterMonitor(cluster);
-		}
-		
-		return clusterMonitor;
-	}
-	
-    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+
+    private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+    /**
+     * @param cluster the cluster to be monitored
+     * @return the created cluster monitor
+     * @throws PolicyValidationException    when deployment policy is not valid
+     * @throws PartitionValidationException when partition is not valid
+     */
+    public static AbstractClusterMonitor getMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+
+        AbstractClusterMonitor clusterMonitor;
+        if (cluster.isKubernetesCluster()) {
+            clusterMonitor = getDockerServiceClusterMonitor(cluster);
+        } else if (cluster.isLbCluster()) {
+            clusterMonitor = getVMLbClusterMonitor(cluster);
+        } else {
+            clusterMonitor = getVMServiceClusterMonitor(cluster);
+        }
+
+        return clusterMonitor;
+    }
+
+    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
         // FIXME fix the following code to correctly update
         // AutoscalerContext context = AutoscalerContext.getInstance();
         if (null == cluster) {
@@ -91,11 +93,11 @@ public class ClusterMonitorFactory {
         }
 
         AutoscalePolicy policy =
-                                 PolicyManager.getInstance()
-                                              .getAutoscalePolicy(autoscalePolicyName);
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
         DeploymentPolicy deploymentPolicy =
-                                            PolicyManager.getInstance()
-                                                         .getDeploymentPolicy(deploymentPolicyName);
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
 
         if (deploymentPolicy == null) {
             String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -106,8 +108,8 @@ public class ClusterMonitorFactory {
         Partition[] allPartitions = deploymentPolicy.getAllPartitions();
         if (allPartitions == null) {
             String msg =
-                         "Deployment Policy's Partitions are null. Policy name: " +
-                                 deploymentPolicyName;
+                    "Deployment Policy's Partitions are null. Policy name: " +
+                    deploymentPolicyName;
             log.error(msg);
             throw new PolicyValidationException(msg);
         }
@@ -115,98 +117,100 @@ public class ClusterMonitorFactory {
         CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
 
         VMServiceClusterMonitor clusterMonitor =
-                                        new VMServiceClusterMonitor(cluster.getClusterId(),
-                                                           cluster.getServiceName(),
-                                                           deploymentPolicy, policy);
+                new VMServiceClusterMonitor(cluster.getClusterId(),
+                                            cluster.getServiceName(),
+                                            deploymentPolicy, policy);
         clusterMonitor.setStatus(ClusterStatus.Created);
-        
-        for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
+
+        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
 
             NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                    partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
 
-            for(Partition partition: partitionGroup.getPartitions()){
+            for (Partition partition : partitionGroup.getPartitions()) {
                 PartitionContext partitionContext = new PartitionContext(partition);
                 partitionContext.setServiceName(cluster.getServiceName());
                 partitionContext.setProperties(cluster.getProperties());
                 partitionContext.setNetworkPartitionId(partitionGroup.getId());
-                
-                for (Member member: cluster.getMembers()){
+
+                for (Member member : cluster.getMembers()) {
                     String memberId = member.getMemberId();
-                    if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
+                    if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
                         MemberContext memberContext = new MemberContext();
                         memberContext.setClusterId(member.getClusterId());
                         memberContext.setMemberId(memberId);
                         memberContext.setPartition(partition);
                         memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-                        
-                        if(MemberStatus.Activated.equals(member.getStatus())){
+
+                        if (MemberStatus.Activated.equals(member.getStatus())) {
                             partitionContext.addActiveMember(memberContext);
 //                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
 //                            partitionContext.incrementCurrentActiveMemberCount(1);
 
-                        } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){
+                        } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
                             partitionContext.addPendingMember(memberContext);
 
 //                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                        } else if(MemberStatus.Suspended.equals(member.getStatus())){
+                        } else if (MemberStatus.Suspended.equals(member.getStatus())) {
 //                            partitionContext.addFaultyMember(memberId);
                         }
                         partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if(log.isInfoEnabled()){
+                        if (log.isInfoEnabled()) {
                             log.info(String.format("Member stat context has been added: [member] %s", memberId));
                         }
                     }
 
                 }
                 networkPartitionContext.addPartitionContext(partitionContext);
-                if(log.isInfoEnabled()){
+                if (log.isInfoEnabled()) {
                     log.info(String.format("Partition context has been added: [partition] %s",
-                            partitionContext.getPartitionId()));
+                                           partitionContext.getPartitionId()));
                 }
             }
 
             clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-            if(log.isInfoEnabled()){
+            if (log.isInfoEnabled()) {
                 log.info(String.format("Network partition context has been added: [network partition] %s",
-                            networkPartitionContext.getId()));
+                                       networkPartitionContext.getId()));
             }
         }
-        
-        
+
+
         // find lb reference type
         java.util.Properties props = cluster.getProperties();
-        
-        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
             String value = props.getProperty(Constants.LOAD_BALANCER_REF);
             clusterMonitor.setLbReferenceType(value);
-            if(log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: "+value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
             }
         }
-        
+
         // set hasPrimary property
         // hasPrimary is true if there are primary members available in that cluster
         clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
 
-        log.info("VMServiceClusterMonitor created: "+clusterMonitor.toString());
+        log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-    
+
     private static Properties convertMemberPropsToMemberContextProps(
-			java.util.Properties properties) {
-    	Properties props = new Properties();
-    	for (Map.Entry<Object, Object> e : properties.entrySet()	) {
-			Property prop = new Property();
-			prop.setName((String)e.getKey());
-			prop.setValue((String)e.getValue());
-			props.addProperties(prop);
-		}    	
-		return props;
-	}
-
-
-	private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+            java.util.Properties properties) {
+        Properties props = new Properties();
+        for (Map.Entry<Object, Object> e : properties.entrySet()) {
+            Property prop = new Property();
+            prop.setName((String) e.getKey());
+            prop.setValue((String) e.getValue());
+            props.addProperties(prop);
+        }
+        return props;
+    }
+
+
+    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
         // FIXME fix the following code to correctly update
         // AutoscalerContext context = AutoscalerContext.getInstance();
         if (null == cluster) {
@@ -222,11 +226,11 @@ public class ClusterMonitorFactory {
         }
 
         AutoscalePolicy policy =
-                                 PolicyManager.getInstance()
-                                              .getAutoscalePolicy(autoscalePolicyName);
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
         DeploymentPolicy deploymentPolicy =
-                                            PolicyManager.getInstance()
-                                                         .getDeploymentPolicy(deploymentPolicyName);
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
 
         if (deploymentPolicy == null) {
             String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -236,21 +240,21 @@ public class ClusterMonitorFactory {
 
         String clusterId = cluster.getClusterId();
         VMLbClusterMonitor clusterMonitor =
-                                        new VMLbClusterMonitor(clusterId,
-                                                           cluster.getServiceName(),
-                                                           deploymentPolicy, policy);
+                new VMLbClusterMonitor(clusterId,
+                                       cluster.getServiceName(),
+                                       deploymentPolicy, policy);
         clusterMonitor.setStatus(ClusterStatus.Created);
         // partition group = network partition context
         for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
 
             NetworkPartitionLbHolder networkPartitionLbHolder =
-                                                              PartitionManager.getInstance()
-                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
+                    PartitionManager.getInstance()
+                            .getNetworkPartitionLbHolder(partitionGroup.getId());
 //                                                              PartitionManager.getInstance()
 //                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
             // FIXME pick a random partition
             Partition partition =
-                                  partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+                    partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
             PartitionContext partitionContext = new PartitionContext(partition);
             partitionContext.setServiceName(cluster.getServiceName());
             partitionContext.setProperties(cluster.getProperties());
@@ -258,7 +262,8 @@ public class ClusterMonitorFactory {
             partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
 
             NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                    partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ;
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
             for (Member member : cluster.getMembers()) {
                 String memberId = member.getMemberId();
                 if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
@@ -280,23 +285,23 @@ public class ClusterMonitorFactory {
                     }
 
                     partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                    if(log.isInfoEnabled()){
+                    if (log.isInfoEnabled()) {
                         log.info(String.format("Member stat context has been added: [member] %s", memberId));
                     }
                 }
 
             }
             networkPartitionContext.addPartitionContext(partitionContext);
-            
+
             // populate lb cluster id in network partition context.
             java.util.Properties props = cluster.getProperties();
 
             // get service type of load balanced cluster
             String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-            
-            if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
                 String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-                
+
                 if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
                     networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
 
@@ -317,13 +322,17 @@ public class ClusterMonitorFactory {
             clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
         }
 
-        log.info("VMLbClusterMonitor created: "+clusterMonitor.toString());
+        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-	
-    private static DockerServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
 
-    	if (null == cluster) {
+    /**
+     * @param cluster - the cluster which needs to be monitored
+     * @return - the cluster monitor
+     */
+    private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
+
+        if (null == cluster) {
             return null;
         }
 
@@ -335,42 +344,43 @@ public class ClusterMonitorFactory {
         AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
         java.util.Properties props = cluster.getProperties();
         String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-		KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, 
-				cluster.getClusterId());
-
-        DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor(
-        		kubernetesClusterCtxt, 
-        		cluster.getClusterId(), 
-        		cluster.getServiceName(), 
-        		policy);
-                                        
+        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+                                                                                      cluster.getClusterId());
+
+        KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+                kubernetesClusterCtxt,
+                cluster.getClusterId(),
+                cluster.getServiceName(),
+                policy);
+
         dockerClusterMonitor.setStatus(ClusterStatus.Created);
-        
-		for (Member member : cluster.getMembers()) {
-			String memberId = member.getMemberId();
-			String clusterId = member.getClusterId();
-			MemberContext memberContext = new MemberContext();
-			memberContext.setMemberId(memberId);
-			memberContext.setClusterId(clusterId);
-
-			if (MemberStatus.Activated.equals(member.getStatus())) {
-				dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
-			} else if (MemberStatus.Created.equals(member.getStatus())
-					|| MemberStatus.Starting.equals(member.getStatus())) {
-				dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-			}
-		}
+
+        //populate the members after restarting        
+        for (Member member : cluster.getMembers()) {
+            String memberId = member.getMemberId();
+            String clusterId = member.getClusterId();
+            MemberContext memberContext = new MemberContext();
+            memberContext.setMemberId(memberId);
+            memberContext.setClusterId(clusterId);
+
+            if (MemberStatus.Activated.equals(member.getStatus())) {
+                dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+            } else if (MemberStatus.Created.equals(member.getStatus())
+                       || MemberStatus.Starting.equals(member.getStatus())) {
+                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+            }
+        }
 
         // find lb reference type
-        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
             String value = props.getProperty(Constants.LOAD_BALANCER_REF);
             dockerClusterMonitor.setLbReferenceType(value);
-            if(log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: "+value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
             }
         }
-        
-        log.info("KubernetesServiceClusterMonitor created: "+ dockerClusterMonitor.toString());
+
+        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
         return dockerClusterMonitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
deleted file mode 100644
index 2621690..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
-
-/*
- * Every container cluster monitor should extend this class
- */
-public abstract class ContainerClusterMonitor extends AbstractClusterMonitor {
-
-	private KubernetesClusterContext kubernetesClusterCtxt;
-	protected AutoscalePolicy autoscalePolicy;
-	
-	protected ContainerClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, 
-			KubernetesClusterContext kubernetesClusterContext,
-			AutoscalerRuleEvaluator autoscalerRuleEvaluator, AutoscalePolicy autoscalePolicy){
-		
-		super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator);
-		this.kubernetesClusterCtxt = kubernetesClusterContext;
-		this.autoscalePolicy = autoscalePolicy;
-	}
-    
-	public KubernetesClusterContext getKubernetesClusterCtxt() {
-		return kubernetesClusterCtxt;
-	}
-
-	public void setKubernetesClusterCtxt(
-			KubernetesClusterContext kubernetesClusterCtxt) {
-		this.kubernetesClusterCtxt = kubernetesClusterCtxt;
-	}
-	
-	public AutoscalePolicy getAutoscalePolicy() {
-		return autoscalePolicy;
-	}
-
-	public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
-		this.autoscalePolicy = autoscalePolicy;
-	}
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
deleted file mode 100644
index 850a295..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Properties;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.common.enums.ClusterType;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{
-	
-	private static final Log log = LogFactory.getLog(DockerServiceClusterMonitor.class);
-
-	private String lbReferenceType;
-    private int numberOfReplicasInServiceCluster = 0;
-	int retryInterval = 60000;
-	
-    public DockerServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt, 
-    		String serviceClusterID, String serviceId, AutoscalePolicy autoscalePolicy) {
-    	super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, kubernetesClusterCtxt,
-    			new AutoscalerRuleEvaluator(), autoscalePolicy);
-        readConfigurations();
-    }
-
-	@Override
-	public void run() {
-		try {
-			// TODO make this configurable,
-			// this is the delay the min check of normal cluster monitor to wait
-			// until LB monitor is added
-			Thread.sleep(60000);
-		} catch (InterruptedException ignore) {
-		}
-
-		while (!isDestroyed()) {
-			if (log.isDebugEnabled()) {
-				log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
-			}
-			try {
-				if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-					monitor();
-				} else {
-					if (log.isDebugEnabled()) {
-						log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-								+ ClusterStatus.In_Maintenance + " mode......");
-					}
-				}
-			} catch (Exception e) {
-				log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
-						e);
-			}
-			try {
-				Thread.sleep(getMonitorInterval());
-			} catch (InterruptedException ignore) {
-			}
-		}
-	}
-	
-	@Override
-	protected void monitor() {
-		
-	    // is container created successfully?
-		boolean success = false;
-		String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID();
-		
-		try {
-			TopologyManager.acquireReadLock();
-			Properties props = TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
-			int minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
-			
-			int nonTerminatedMembers = getKubernetesClusterCtxt().getActiveMembers().size() + getKubernetesClusterCtxt().getPendingMembers().size();
-
-			if (nonTerminatedMembers == 0) {
-				
-				while (!success) {
-					try {
-
-						MemberContext memberContext = CloudControllerClient.getInstance().createContainer(kubernetesClusterId, getClusterId());
-						if(null != memberContext) {
-							getKubernetesClusterCtxt().addPendingMember(memberContext);
-							success = true;
-							numberOfReplicasInServiceCluster = minReplicas;
-							if(log.isDebugEnabled()){
-								log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", 
-										memberContext.getMemberId(), getKubernetesClusterCtxt().getKubernetesClusterID()));
-							}
-						} else {
-							if (log.isDebugEnabled()) {
-								log.debug("Returned member context is null, did not add to pending members");
-							}
-						}
-					} catch (Throwable e) {
-						if (log.isDebugEnabled()) {
-							String message = "Cannot create a container, will retry in "+(retryInterval/1000)+"s";
-							log.debug(message, e);
-						}
-					}
-					
-	                try {
-	                    Thread.sleep(retryInterval);
-	                } catch (InterruptedException e1) {
-	                }
-				}
-			}
-		} finally {
-			TopologyManager.releaseReadLock();
-		}
-	}
-	
-	@Override
-	public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getScaleCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        if(log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. "+this.toString());
-        }		
-	}
-	
-    @Override
-    protected void readConfigurations () {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
-        setMonitorInterval(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorInterval());
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KubernetesServiceClusterMonitor "
-        		+ "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
-        		+ ", clusterId=" + getClusterId() 
-        		+ ", serviceId=" + getServiceId() + "]";
-    }
-    
-	public String getLbReferenceType() {
-		return lbReferenceType;
-	}
-
-	public void setLbReferenceType(String lbReferenceType) {
-		this.lbReferenceType = lbReferenceType;
-	}
-}
\ No newline at end of file


Mime
View raw message