helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [2/2] helix git commit: Fix some race condition issues in MBean management classes. Fix the listener adding sequence.
Date Wed, 30 Jan 2019 19:28:34 GMT
Fix some race condition issues in MBean management classes. Fix the listener adding sequence.

Stabilized the TestClusterStatusMonitorLifecycle accordingly.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9ec24e9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9ec24e9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9ec24e9

Branch: refs/heads/master
Commit: f9ec24e994afbf834a4454dcb2cbe48af71e24dd
Parents: 0e0fcb9
Author: jiajunwang <ericwang1985@gmail.com>
Authored: Tue Jan 29 17:33:31 2019 -0800
Committer: Jiajun Wang <jjwang@jjwang-ld2.linkedin.biz>
Committed: Wed Jan 30 11:26:21 2019 -0800

----------------------------------------------------------------------
 .../helix/controller/HelixControllerMain.java   |   2 +-
 .../manager/zk/ControllerManagerHelper.java     |   9 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java | 406 +++++++++++--------
 .../manager/TestConsecutiveZkSessionExpiry.java |   4 +-
 .../TestDistributedControllerManager.java       |   2 +-
 .../TestClusterEventStatusMonitor.java          |   8 +-
 .../TestClusterStatusMonitorLifecycle.java      |  19 +-
 7 files changed, 256 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 5265c24..d8e5a3f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -133,12 +133,12 @@ public class HelixControllerMain {
   public static void addListenersToController(HelixManager manager,
       GenericHelixController controller) {
     try {
+      manager.addControllerListener(controller);
       manager.addInstanceConfigChangeListener(controller);
       manager.addResourceConfigChangeListener(controller);
       manager.addClusterfigChangeListener(controller);
       manager.addLiveInstanceChangeListener(controller);
       manager.addIdealStateChangeListener(controller);
-      manager.addControllerListener(controller);
     } catch (ZkInterruptedException e) {
       logger
           .warn("zk connection is interrupted during HelixManagerMain.addListenersToController().
"

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index 1df53ec..b8202d9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -79,12 +79,12 @@ public class ControllerManagerHelper {
       /**
        * setup generic-controller
        */
+      _manager.addControllerListener(controller);
       _manager.addInstanceConfigChangeListener(controller);
       _manager.addResourceConfigChangeListener(controller);
       _manager.addClusterfigChangeListener(controller);
       _manager.addLiveInstanceChangeListener(controller);
       _manager.addIdealStateChangeListener(controller);
-      _manager.addControllerListener(controller);
     } catch (ZkInterruptedException e) {
       LOG.warn("zk connection is interrupted during HelixManagerMain.addListenersToController().
"
           + e);
@@ -98,10 +98,11 @@ public class ControllerManagerHelper {
     /**
      * reset generic-controller
      */
-    _manager.removeListener(keyBuilder.instanceConfigs(), controller);
-    _manager.removeListener(keyBuilder.resourceConfigs(), controller);
-    _manager.removeListener(keyBuilder.liveInstances(), controller);
     _manager.removeListener(keyBuilder.idealStates(), controller);
+    _manager.removeListener(keyBuilder.liveInstances(), controller);
+    _manager.removeListener(keyBuilder.clusterConfig(), controller);
+    _manager.removeListener(keyBuilder.resourceConfigs(), controller);
+    _manager.removeListener(keyBuilder.instanceConfigs(), controller);
     _manager.removeListener(keyBuilder.controller(), controller);
 
     /**

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index a980d3c..1455032 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -86,18 +87,20 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
-  private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new
ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ResourceMonitor> _resourceMonitorMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMonitorMap =
+      new ConcurrentHashMap<>();
 
   // phaseName -> eventMonitor
-  protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap
=
+  protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMonitorMap
=
       new ConcurrentHashMap<>();
 
   /**
-   * PerInstanceResource bean map: beanName->bean
+   * PerInstanceResource monitor map: beanName->monitor
    */
-  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>
_perInstanceResourceMap =
-      new ConcurrentHashMap<>();
+  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>
+      _perInstanceResourceMonitorMap = new ConcurrentHashMap<>();
 
   private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap = new ConcurrentHashMap<>();
 
@@ -227,60 +230,60 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
    * @param disabledPartitions a map of instance name to the set of partitions disabled on
it
    * @param tags a map of instance name to the set of tags on it
    */
-  public synchronized void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String>
instanceSet,
+  public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String>
instanceSet,
       Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>>
disabledPartitions,
       Map<String, List<String>> oldDisabledPartitions, Map<String, Set<String>>
tags) {
-    // Unregister beans for instances that are no longer configured
-    Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet());
-    toUnregister.removeAll(instanceSet);
-    try {
-      unregisterInstances(toUnregister);
-    } catch (MalformedObjectNameException e) {
-      LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
-    }
-
-    // Register beans for instances that are newly configured
-    Set<String> toRegister = Sets.newHashSet(instanceSet);
-    toRegister.removeAll(_instanceMbeanMap.keySet());
-    Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
-    for (String instanceName : toRegister) {
-      InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
-      bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-          oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
-          !disabledInstanceSet.contains(instanceName));
-      monitorsToRegister.add(bean);
-    }
-    try {
-      registerInstances(monitorsToRegister);
-    } catch (MalformedObjectNameException e) {
-      LOG.error("Could not register instances with MBean server: " + toRegister, e);
-    }
-
-    // Update all the sets
-    _instances = instanceSet;
-    _liveInstances = liveInstanceSet;
-    _disabledInstances = disabledInstanceSet;
-    _disabledPartitions = disabledPartitions;
-    _oldDisabledPartitions = oldDisabledPartitions;
-
-    // Update the instance MBeans
-    for (String instanceName : instanceSet) {
-      if (_instanceMbeanMap.containsKey(instanceName)) {
-        // Update the bean
-        InstanceMonitor bean = _instanceMbeanMap.get(instanceName);
-        String oldSensorName = bean.getSensorName();
+    synchronized (_instanceMonitorMap) {
+      // Unregister beans for instances that are no longer configured
+      Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
+      toUnregister.removeAll(instanceSet);
+      try {
+        unregisterInstances(toUnregister);
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
+      }
+
+      // Register beans for instances that are newly configured
+      Set<String> toRegister = Sets.newHashSet(instanceSet);
+      toRegister.removeAll(_instanceMonitorMap.keySet());
+      Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
+      for (String instanceName : toRegister) {
+        InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
         bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
-            !disabledInstanceSet.contains(instanceName));
-
-        // If the sensor name changed, re-register the bean so that listeners won't miss
it
-        String newSensorName = bean.getSensorName();
-        if (!oldSensorName.equals(newSensorName)) {
-          try {
-            unregisterInstances(Arrays.asList(instanceName));
-            registerInstances(Arrays.asList(bean));
-          } catch (MalformedObjectNameException e) {
-            LOG.error("Could not refresh registration with MBean server: " + instanceName,
e);
+            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
!disabledInstanceSet.contains(instanceName));
+        monitorsToRegister.add(bean);
+      }
+      try {
+        registerInstances(monitorsToRegister);
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Could not register instances with MBean server: " + toRegister, e);
+      }
+
+      // Update all the sets
+      _instances = instanceSet;
+      _liveInstances = liveInstanceSet;
+      _disabledInstances = disabledInstanceSet;
+      _disabledPartitions = disabledPartitions;
+      _oldDisabledPartitions = oldDisabledPartitions;
+
+      // Update the instance MBeans
+      for (String instanceName : instanceSet) {
+        if (_instanceMonitorMap.containsKey(instanceName)) {
+          // Update the bean
+          InstanceMonitor bean = _instanceMonitorMap.get(instanceName);
+          String oldSensorName = bean.getSensorName();
+          bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
+              oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
!disabledInstanceSet.contains(instanceName));
+
+          // If the sensor name changed, re-register the bean so that listeners won't miss
it
+          String newSensorName = bean.getSensorName();
+          if (!oldSensorName.equals(newSensorName)) {
+            try {
+              unregisterInstances(Arrays.asList(instanceName));
+              registerInstances(Arrays.asList(bean));
+            } catch (MalformedObjectNameException e) {
+              LOG.error("Could not refresh registration with MBean server: " + instanceName,
e);
+            }
           }
         }
       }
@@ -302,12 +305,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
   private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
     try {
-      if (!_clusterEventMbeanMap.containsKey(phase)) {
-        synchronized (this) {
-          if (!_clusterEventMbeanMap.containsKey(phase)) {
+      if (!_clusterEventMonitorMap.containsKey(phase)) {
+        synchronized (_clusterEventMonitorMap) {
+          if (!_clusterEventMonitorMap.containsKey(phase)) {
             ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
             monitor.register();
-            _clusterEventMbeanMap.put(phase, monitor);
+            _clusterEventMonitorMap.put(phase, monitor);
           }
         }
       }
@@ -316,7 +319,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
           + " and phase type: " + phase, e);
     }
 
-    return _clusterEventMbeanMap.get(phase);
+    return _clusterEventMonitorMap.get(phase);
   }
 
   /**
@@ -349,13 +352,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
     // Update message count per instance and per resource
     for (String instance : messageCountPerInstance.keySet()) {
-      if (_instanceMbeanMap.containsKey(instance)) {
-        _instanceMbeanMap.get(instance).increaseMessageCount(messageCountPerInstance.get(instance));
+      InstanceMonitor instanceMonitor = _instanceMonitorMap.get(instance);
+      if (instanceMonitor != null) {
+        instanceMonitor.increaseMessageCount(messageCountPerInstance.get(instance));
       }
     }
     for (String resource : messageCountPerResource.keySet()) {
-      if (_resourceMbeanMap.containsKey(resource)) {
-        _resourceMbeanMap.get(resource).increaseMessageCount(messageCountPerResource.get(resource));
+      ResourceMonitor resourceMonitor = _resourceMonitorMap.get(resource);
+      if (resourceMonitor != null) {
+        resourceMonitor.increaseMessageCount(messageCountPerResource.get(resource));
       }
     }
   }
@@ -389,9 +394,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
         }
       }
     }
-    synchronized (_perInstanceResourceMap) {
+    synchronized (_perInstanceResourceMonitorMap) {
       // Unregister beans for per-instance resources that no longer exist
-      Set<PerInstanceResourceMonitor.BeanName> toUnregister = Sets.newHashSet(_perInstanceResourceMap.keySet());
+      Set<PerInstanceResourceMonitor.BeanName> toUnregister = Sets.newHashSet(
+          _perInstanceResourceMonitorMap.keySet());
       toUnregister.removeAll(beanMap.keySet());
       try {
         unregisterPerInstanceResources(toUnregister);
@@ -400,7 +406,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
       }
       // Register beans for per-instance resources that are newly configured
       Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
-      toRegister.removeAll(_perInstanceResourceMap.keySet());
+      toRegister.removeAll(_perInstanceResourceMonitorMap.keySet());
       Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
       for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
         PerInstanceResourceMonitor bean =
@@ -416,8 +422,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
         LOG.error("Fail to register per-instance resource with MBean server: " + toRegister,
e);
       }
       // Update existing beans
-      for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet())
{
-        PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+      for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMonitorMap.keySet())
{
+        PerInstanceResourceMonitor bean = _perInstanceResourceMonitorMap.get(beanName);
         String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
         InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
         bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()), stateModelDefMap.get(stateModelDefName));
@@ -431,9 +437,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
    */
   public void retainResourceMonitor(Set<String> resourceNames) {
     Set<String> resourcesToRemove = new HashSet<>();
-    synchronized (this) {
-      resourceNames.retainAll(_resourceMbeanMap.keySet());
-      resourcesToRemove.addAll(_resourceMbeanMap.keySet());
+    synchronized (_resourceMonitorMap) {
+      resourceNames.retainAll(_resourceMonitorMap.keySet());
+      resourcesToRemove.addAll(_resourceMonitorMap.keySet());
     }
     resourcesToRemove.removeAll(resourceNames);
 
@@ -446,7 +452,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
     try {
       unregisterResources(resourcesToRemove);
-    } catch (MalformedObjectNameException e) {
+    } catch (Exception e) {
       LOG.error(String.format("Could not unregister beans for the following resources: %s",
           Joiner.on(',').join(resourcesToRemove)), e);
     }
@@ -466,7 +472,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     }
   }
 
-  public synchronized void updateMissingTopStateDurationStats(String resourceName,
+  public void updateMissingTopStateDurationStats(String resourceName,
       long totalDuration, long helixLatency, boolean isGraceful, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
@@ -490,13 +496,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
   private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
     try {
-      if (!_resourceMbeanMap.containsKey(resourceName)) {
-        synchronized (this) {
-          if (!_resourceMbeanMap.containsKey(resourceName)) {
+      if (!_resourceMonitorMap.containsKey(resourceName)) {
+        synchronized (_resourceMonitorMap) {
+          if (!_resourceMonitorMap.containsKey(resourceName)) {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
                 new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
-            _resourceMbeanMap.put(resourceName, bean);
+            _resourceMonitorMap.put(resourceName, bean);
           }
         }
       }
@@ -504,11 +510,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
       LOG.error("Fail to register resource mbean, resource: " + resourceName);
     }
 
-    return _resourceMbeanMap.get(resourceName);
+    return _resourceMonitorMap.get(resourceName);
   }
 
   public void resetMaxMissingTopStateGauge() {
-    for (ResourceMonitor monitor : _resourceMbeanMap.values()) {
+    for (ResourceMonitor monitor : _resourceMonitorMap.values()) {
       monitor.resetMaxTopStateHandoffGauge();
     }
   }
@@ -529,19 +535,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   public void reset() {
     LOG.info("Reset ClusterStatusMonitor");
     try {
-      unregisterResources(_resourceMbeanMap.keySet());
-
-      _resourceMbeanMap.clear();
+      unregisterAllResources();
       _instanceMsgQueueSizes.clear();
-
-      unregisterInstances(_instanceMbeanMap.keySet());
-      _instanceMbeanMap.clear();
-
-      unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+      unregisterAllInstances();
+      unregisterAllPerInstanceResources();
       unregister(getObjectName(clusterBeanName()));
-      unregisterEventMonitors(_clusterEventMbeanMap.values());
-      unregisterWorkflows(_perTypeWorkflowMonitorMap.keySet());
-      unregisterJobs(_perTypeJobMonitorMap.keySet());
+      unregisterAllEventMonitors();
+      unregisterAllWorkflowsMonitor();
+      unregisterAllJobs();
 
       _rebalanceFailure = false;
     } catch (Exception e) {
@@ -550,8 +551,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   }
 
   public void refreshWorkflowsStatus(TaskDriver driver) {
-    for (WorkflowMonitor workflowMonitor : _perTypeWorkflowMonitorMap.values()) {
-      workflowMonitor.resetGauges();
+    for (Map.Entry<String, WorkflowMonitor> workflowMonitor : _perTypeWorkflowMonitorMap
+        .entrySet()) {
+      workflowMonitor.getValue().resetGauges();
     }
 
     Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
@@ -571,13 +573,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to, long latency)
{
     String workflowType = workflowConfig.getWorkflowType();
     workflowType = preProcessWorkflow(workflowType);
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to, latency);
+    WorkflowMonitor workflowMonitor = _perTypeWorkflowMonitorMap.get(workflowType);
+    if (workflowMonitor != null) {
+      workflowMonitor.updateWorkflowCounters(to, latency);
+    }
   }
 
   private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState current) {
     String workflowType = workflowConfig.getWorkflowType();
     workflowType = preProcessWorkflow(workflowType);
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowGauges(current);
+    WorkflowMonitor workflowMonitor = _perTypeWorkflowMonitorMap.get(workflowType);
+    if (workflowMonitor != null) {
+      workflowMonitor.updateWorkflowGauges(current);
+    }
   }
 
   private String preProcessWorkflow(String workflowType) {
@@ -585,21 +593,23 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
       workflowType = DEFAULT_WORKFLOW_JOB_TYPE;
     }
 
-    if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
-      WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
-      try {
-        registerWorkflow(monitor);
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Failed to register object for workflow type : " + workflowType, e);
+    synchronized (_perTypeWorkflowMonitorMap) {
+      if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
+        WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
+        try {
+          registerWorkflow(monitor);
+        } catch (MalformedObjectNameException e) {
+          LOG.error("Failed to register object for workflow type : " + workflowType, e);
+        }
+        _perTypeWorkflowMonitorMap.put(workflowType, monitor);
       }
-      _perTypeWorkflowMonitorMap.put(workflowType, monitor);
     }
     return workflowType;
   }
 
   public void refreshJobsStatus(TaskDriver driver) {
-    for (JobMonitor jobMonitor : _perTypeJobMonitorMap.values()) {
-      jobMonitor.resetJobGauge();
+    for (Map.Entry<String, JobMonitor> jobMonitor : _perTypeJobMonitorMap.entrySet())
{
+      jobMonitor.getValue().resetJobGauge();
     }
     for (String workflow : driver.getWorkflows().keySet()) {
       if (workflow.isEmpty()) {
@@ -626,14 +636,20 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   public void updateJobCounters(JobConfig jobConfig, TaskState to, long latency) {
     String jobType = jobConfig.getJobType();
     jobType = preProcessJobMonitor(jobType);
-    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to, latency);
+    JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType);
+    if (jobMonitor != null) {
+      jobMonitor.updateJobCounters(to, latency);
+    }
   }
 
   private void updateJobGauges(String jobType, TaskState current) {
     // When first time for WorkflowRebalancer call, jobconfig may not ready.
     // Thus only check it for gauge.
     jobType = preProcessJobMonitor(jobType);
-    _perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
+    JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType);
+    if (jobMonitor != null) {
+      jobMonitor.updateJobGauge(current);
+    }
   }
 
   private String preProcessJobMonitor(String jobType) {
@@ -641,114 +657,154 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
       jobType = DEFAULT_WORKFLOW_JOB_TYPE;
     }
 
-    if (!_perTypeJobMonitorMap.containsKey(jobType)) {
-      JobMonitor monitor = new JobMonitor(_clusterName, jobType);
-      try {
-        registerJob(monitor);
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Failed to register job type : " + jobType, e);
+    synchronized (_perTypeJobMonitorMap) {
+      if (!_perTypeJobMonitorMap.containsKey(jobType)) {
+        JobMonitor monitor = new JobMonitor(_clusterName, jobType);
+        try {
+          registerJob(monitor);
+        } catch (MalformedObjectNameException e) {
+          LOG.error("Failed to register job type : " + jobType, e);
+        }
+        _perTypeJobMonitorMap.put(jobType, monitor);
       }
-      _perTypeJobMonitorMap.put(jobType, monitor);
     }
     return jobType;
   }
 
-  private synchronized void registerInstances(Collection<InstanceMonitor> instances)
+  private void registerInstances(Collection<InstanceMonitor> instances)
       throws MalformedObjectNameException {
-    for (InstanceMonitor monitor : instances) {
-      String instanceName = monitor.getInstanceName();
-      String beanName = getInstanceBeanName(instanceName);
-      register(monitor, getObjectName(beanName));
-      _instanceMbeanMap.put(instanceName, monitor);
+    synchronized (_instanceMonitorMap) {
+      for (InstanceMonitor monitor : instances) {
+        String instanceName = monitor.getInstanceName();
+        String beanName = getInstanceBeanName(instanceName);
+        register(monitor, getObjectName(beanName));
+        _instanceMonitorMap.put(instanceName, monitor);
+      }
+    }
+  }
+
+  private void unregisterAllInstances() throws MalformedObjectNameException {
+    synchronized (_instanceMonitorMap) {
+      unregisterInstances(_instanceMonitorMap.keySet());
+    }
+  }
+
+  private void unregisterInstances(Collection<String> instances) throws MalformedObjectNameException
{
+    synchronized (_instanceMonitorMap) {
+      for (String instanceName : instances) {
+        String beanName = getInstanceBeanName(instanceName);
+        unregister(getObjectName(beanName));
+      }
+      _instanceMonitorMap.keySet().removeAll(instances);
+    }
+  }
+
+  private void registerResources(Collection<String> resources) throws JMException {
+    synchronized (_resourceMonitorMap) {
+      for (String resourceName : resources) {
+        ResourceMonitor monitor = _resourceMonitorMap.get(resourceName);
+        if (monitor != null) {
+          monitor.register();
+        }
+      }
     }
   }
 
-  private synchronized void unregisterInstances(Collection<String> instances) throws
MalformedObjectNameException {
-    for (String instanceName : instances) {
-      String beanName = getInstanceBeanName(instanceName);
-      unregister(getObjectName(beanName));
+  private void unregisterAllResources() {
+    synchronized (_resourceMonitorMap) {
+      unregisterResources(_resourceMonitorMap.keySet());
     }
-    _instanceMbeanMap.keySet().removeAll(instances);
   }
 
-  private synchronized void registerResources(Collection<String> resources) throws
JMException {
-    for (String resourceName : resources) {
-      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
-      if (monitor != null) {
-        monitor.register();
+  private void unregisterResources(Collection<String> resources) {
+    synchronized (_resourceMonitorMap) {
+      for (String resourceName : resources) {
+        ResourceMonitor monitor = _resourceMonitorMap.get(resourceName);
+        if (monitor != null) {
+          monitor.unregister();
+        }
       }
+      _resourceMonitorMap.keySet().removeAll(resources);
     }
   }
 
-  private synchronized void unregisterResources(Collection<String> resources) throws
MalformedObjectNameException {
-    for (String resourceName : resources) {
-      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
-      if (monitor != null) {
+  private void unregisterAllEventMonitors() {
+    synchronized (_clusterEventMonitorMap) {
+      for (ClusterEventMonitor monitor : _clusterEventMonitorMap.values()) {
         monitor.unregister();
       }
+      _clusterEventMonitorMap.clear();
     }
-    _resourceMbeanMap.keySet().removeAll(resources);
   }
 
-  private synchronized void unregisterEventMonitors(Collection<ClusterEventMonitor>
monitors)
+  private void registerPerInstanceResources(Collection<PerInstanceResourceMonitor>
monitors)
       throws MalformedObjectNameException {
-    for (ClusterEventMonitor monitor : monitors) {
-      monitor.unregister();
+    synchronized (_perInstanceResourceMonitorMap) {
+      for (PerInstanceResourceMonitor monitor : monitors) {
+        String instanceName = monitor.getInstanceName();
+        String resourceName = monitor.getResourceName();
+        String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+        register(monitor, getObjectName(beanName));
+        _perInstanceResourceMonitorMap
+            .put(new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+      }
     }
-    _resourceMbeanMap.keySet().removeAll(monitors);
   }
 
-  private synchronized void registerPerInstanceResources(Collection<PerInstanceResourceMonitor>
monitors)
+  private void unregisterAllPerInstanceResources()
       throws MalformedObjectNameException {
-    for (PerInstanceResourceMonitor monitor : monitors) {
-      String instanceName = monitor.getInstanceName();
-      String resourceName = monitor.getResourceName();
-      String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
-      register(monitor, getObjectName(beanName));
-      _perInstanceResourceMap.put(
-          new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+    synchronized (_perInstanceResourceMonitorMap) {
+      unregisterPerInstanceResources(_perInstanceResourceMonitorMap.keySet());
     }
   }
 
-  private synchronized void unregisterPerInstanceResources(Collection<PerInstanceResourceMonitor.BeanName>
beanNames)
+  private void unregisterPerInstanceResources(Collection<PerInstanceResourceMonitor.BeanName>
beanNames)
       throws MalformedObjectNameException {
-    for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
-      unregister(getObjectName(
-          getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
+    synchronized (_perInstanceResourceMonitorMap) {
+      for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+        unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
beanName.resourceName())));
+      }
+      _perInstanceResourceMonitorMap.keySet().removeAll(beanNames);
     }
-    _perInstanceResourceMap.keySet().removeAll(beanNames);
   }
 
-  private synchronized void registerWorkflow(WorkflowMonitor workflowMonitor) throws MalformedObjectNameException
{
+  private void registerWorkflow(WorkflowMonitor workflowMonitor) throws MalformedObjectNameException
{
     String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType());
     register(workflowMonitor, getObjectName(workflowBeanName));
   }
 
-  private synchronized void unregisterWorkflows(Collection<String> workflowMonitors)
+  private void unregisterAllWorkflowsMonitor()
       throws MalformedObjectNameException {
-    for (String workflowMonitor : workflowMonitors) {
-      String workflowBeanName = getWorkflowBeanName(workflowMonitor);
-      unregister(getObjectName(workflowBeanName));
-      _perTypeWorkflowMonitorMap.remove(workflowMonitor);
+    synchronized (_perTypeWorkflowMonitorMap) {
+      Iterator<Map.Entry<String, WorkflowMonitor>> workflowIter =
+          _perTypeWorkflowMonitorMap.entrySet().iterator();
+      while (workflowIter.hasNext()) {
+        Map.Entry<String, WorkflowMonitor> workflowEntry = workflowIter.next();
+        unregister(getObjectName(getWorkflowBeanName(workflowEntry.getKey())));
+        workflowIter.remove();
+      }
     }
   }
 
-  private synchronized void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException
{
+  private void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException {
     String jobBeanName = getJobBeanName(jobMonitor.getJobType());
     register(jobMonitor, getObjectName(jobBeanName));
   }
 
-  private synchronized void unregisterJobs(Collection<String> jobMonitors) throws MalformedObjectNameException
{
-    for (String jobMonitor : jobMonitors) {
-      String jobBeanName = getJobBeanName(jobMonitor);
-      unregister(getObjectName(jobBeanName));
-      _perTypeJobMonitorMap.remove(jobMonitor);
+  private void unregisterAllJobs() throws MalformedObjectNameException {
+    synchronized (_perTypeJobMonitorMap) {
+      Iterator<Map.Entry<String, JobMonitor>> jobIter = _perTypeJobMonitorMap.entrySet().iterator();
+      while (jobIter.hasNext()) {
+        Map.Entry<String, JobMonitor> jobEntry = jobIter.next();
+        unregister(getObjectName(getJobBeanName(jobEntry.getKey())));
+        jobIter.remove();
+      }
     }
   }
 
   // For test only
   protected ResourceMonitor getResourceMonitor(String resourceName) {
-    return _resourceMbeanMap.get(resourceName);
+    return _resourceMonitorMap.get(resourceName);
   }
 
   public String clusterBeanName() {
@@ -848,13 +904,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
   @Override
   public long getTotalResourceGauge() {
-    return _resourceMbeanMap.size();
+    return _resourceMonitorMap.size();
   }
 
   @Override
   public long getTotalPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getPartitionGauge();
     }
     return total;
@@ -863,7 +919,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getErrorPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getErrorPartitionGauge();
     }
     return total;
@@ -872,7 +928,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getMissingTopStatePartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getMissingTopStatePartitionGauge();
     }
     return total;
@@ -881,7 +937,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getMissingMinActiveReplicaPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getMissingMinActiveReplicaPartitionGauge();
     }
     return total;
@@ -890,7 +946,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getMissingReplicaPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getMissingReplicaPartitionGauge();
     }
     return total;
@@ -899,7 +955,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getDifferenceWithIdealStateGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getDifferenceWithIdealStateGauge();
     }
     return total;
@@ -908,7 +964,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getStateTransitionCounter() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getTotalMessageReceived();
     }
     return total;
@@ -917,7 +973,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getPendingStateTransitionGuage() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet())
{
       total += entry.getValue().getNumPendingStateTransitionGauge();
     }
     return total;

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index bd910e9..391bdd5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -243,8 +243,8 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            2,
-            "Distributed controller should have 2 handler (message) after lose leadership,
but was "
+            1,
+            "Distributed controller should have 1 handler (message) after lose leadership,
but was "
                 + handlers.size());
 
     // clean up

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index 0a1b9fb..909e44c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -146,7 +146,7 @@ public class TestDistributedControllerManager extends ZkTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            2,
+            1,
             "Distributed controller should have 1 handler (message) after lose leadership,
but was "
                 + handlers.size());
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index 5f0029f..7966f03 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -49,8 +49,8 @@ public class TestClusterEventStatusMonitor {
       super(clusterName);
       active();
     }
-    public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() {
-      return _clusterEventMbeanMap;
+    public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMonitors()
{
+      return _clusterEventMonitorMap;
     }
   }
 
@@ -149,11 +149,11 @@ public class TestClusterEventStatusMonitor {
 
   private void addTestEventMonitor(ClusterStatusMonitorForTest monitor, String phaseName)
throws
       JMException {
-    ConcurrentHashMap<String, ClusterEventMonitor> mbean = monitor.getClusterEventMBean();
+    ConcurrentHashMap<String, ClusterEventMonitor> monitors = monitor.getClusterEventMonitors();
     ClusterEventMonitor eventMonitor = new ClusterEventMonitor(monitor, phaseName,
         TEST_SLIDING_WINDOW_MS);
     eventMonitor.register();
-    mbean.put(phaseName, eventMonitor);
+    monitors.put(phaseName, eventMonitor);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 7eb2b8d..6156666 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -233,7 +233,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount - 2);
       }
-    }, 10000));
+    }, 3000));
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -247,8 +247,13 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     }
     firstController.disconnect();
 
+    ZkHelixClusterVerifier controllerClusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(_controllerClusterName).setZkClient(_gZkClient)
+            .build();
+    Assert.assertTrue(controllerClusterVerifier.verifyByPolling(), "Controller cluster was
not converged");
+
     // 1 controller goes away
-    // 1 message queue mbean, 1 PerInstanceResource mbean, and one message queue mbean
+    // 1 message queue mbean, 1 PerInstanceResource mbean, and one event mbean
     final int previousMBeanCount2 = mbeans.size();
     Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
       @Override public boolean verify() throws Exception {
@@ -258,7 +263,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount2 - 3);
       }
-    }, 10000));
+    }, 5000));
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
@@ -276,7 +281,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount3 + 2);
       }
-    }, 10000));
+    }, 3000));
 
     // Add a resource
     // Register 1 resource mbean
@@ -299,7 +304,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount4 + _participants.length + 1);
       }
-    }, 10000));
+    }, 3000));
 
     // Remove a resource
     // No change in instance/resource mbean
@@ -315,7 +320,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount5 - (_participants.length + 1));
       }
-    }, 10000));
+    }, 3000));
 
     // Cleanup controllers then MBeans should all be removed.
     cleanupControllers();
@@ -331,6 +336,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         return ManagementFactory.getPlatformMBeanServer()
             .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).isEmpty();
       }
-    }, 10000));
+    }, 3000));
   }
 }


Mime
View raw message