helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: [HELIX-697] Add cluster level metrics in ClusterStatusMonitor
Date Thu, 19 Apr 2018 21:03:24 GMT
Repository: helix
Updated Branches:
  refs/heads/master 074667363 -> e1faf2404


[HELIX-697] Add cluster level metrics in ClusterStatusMonitor


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e1faf240
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e1faf240
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e1faf240

Branch: refs/heads/master
Commit: e1faf2404c3bb74aab7c402d76246b41af74fd16
Parents: 0746673
Author: Hunter Lee <narendly@gmail.com>
Authored: Thu Apr 19 13:33:54 2018 -0700
Committer: Hunter Lee <narendly@gmail.com>
Committed: Thu Apr 19 13:34:11 2018 -0700

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterStatusMonitor.java |  47 +++-
 .../mbeans/ClusterStatusMonitorMBean.java       |  59 +++--
 .../monitoring/mbeans/ResourceMonitor.java      | 120 +++++++----
 .../dynamicMBeans/DynamicMBeanProvider.java     |   4 +-
 .../mbeans/TestClusterAggregateMetrics.java     | 214 +++++++++++++++++++
 5 files changed, 384 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index c7e0fdb..954ae7d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -50,7 +50,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
   static final String JOB_TYPE_DN_KEY = "jobType";
   static final String DEFAULT_WORKFLOW_JOB_TYPE = "DEFAULT";
-
   public static final String DEFAULT_TAG = "DEFAULT";
 
   private final String _clusterName;
@@ -69,9 +68,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
+  // Aggregate metrics from ResourceMonitors
+  private volatile long _totalPartitionCount = 0;
+  private volatile long _totalErrorPartitionCount = 0;
+  private volatile long _totalPartitionsWithoutTopStateCount = 0;
+  private volatile long _totalExternalViewIdealStateMismatchPartitionCount = 0;
 
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new
ConcurrentHashMap<>();
-
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new
ConcurrentHashMap<>();
 
   // phaseName -> eventMonitor
@@ -457,7 +460,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
-                new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
+                new ResourceMonitor(this, _clusterName, resourceName, getObjectName(beanName));
             bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
@@ -777,7 +780,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     _inMaintenance = inMaintenance;
   }
 
-
   @Override
   public long getPaused() {
     return _paused ? 1 : 0;
@@ -799,4 +801,41 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   public long getRebalanceFailureCounter() {
     return _rebalanceFailureCount.get();
   }
+
+  @Override
+  public long getTotalPartitionCount() {
+    return _totalPartitionCount;
+  }
+
+  @Override
+  public long getTotalErrorPartitionCount() {
+    return _totalErrorPartitionCount;
+  }
+
+  @Override
+  public long getTotalPartitionsWithoutTopStateCount() {
+    return _totalPartitionsWithoutTopStateCount;
+  }
+
+  @Override
+  public long getTotalExternalViewIdealStateMismatchPartitionCount() {
+    return _totalExternalViewIdealStateMismatchPartitionCount;
+  }
+
+  synchronized void applyDeltaToTotalPartitionCount(long delta) {
+    _totalPartitionCount += delta;
+  }
+
+  synchronized void applyDeltaToTotalErrorPartitionCount(long delta) {
+    _totalErrorPartitionCount += delta;
+  }
+
+  synchronized void applyDeltaToTotalPartitionsWithoutTopStateCount(long delta) {
+    _totalPartitionsWithoutTopStateCount += delta;
+  }
+
+  synchronized void applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(long delta)
{
+    _totalExternalViewIdealStateMismatchPartitionCount += delta;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 49d316e..6ace495 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -22,49 +22,82 @@ package org.apache.helix.monitoring.mbeans;
 import org.apache.helix.monitoring.SensorNameProvider;
 
 public interface ClusterStatusMonitorMBean extends SensorNameProvider {
-  public long getDownInstanceGauge();
 
-  public long getInstancesGauge();
+  /**
+   * @return number of instances that are down (non-live instances)
+   */
+  long getDownInstanceGauge();
+
+  /**
+   * @return total number of instances
+   */
+  long getInstancesGauge();
 
-  public long getDisabledInstancesGauge();
+  /**
+   * @return number of disabled instances
+   */
+  long getDisabledInstancesGauge();
 
-  public long getDisabledPartitionsGauge();
+  /**
+   * @return number of disabled partitions
+   */
+  long getDisabledPartitionsGauge();
 
-  public long getRebalanceFailureGauge();
+  /**
+   * @return 1 if rebalance failed; 0 if rebalance did not fail
+   */
+  long getRebalanceFailureGauge();
 
   /**
    * The max message queue size across all instances including controller
    * @return
    */
-  public long getMaxMessageQueueSizeGauge();
+  long getMaxMessageQueueSizeGauge();
 
   /**
    * The sum of all message queue sizes for instances in this cluster
    * @return
    */
-  public long getInstanceMessageQueueBacklog();
+  long getInstanceMessageQueueBacklog();
 
   /**
    * @return 1 if cluster is enabled, otherwise 0
    */
-  public long getEnabled();
+  long getEnabled();
 
   /**
-   *
    * @return 1 if cluster is in maintenance mode, otherwise 0
    */
-  public long getMaintenance();
-
+  long getMaintenance();
 
   /**
-   *
    * @return 1 if cluster is paused, otherwise 0
    */
-  public long getPaused();
+  long getPaused();
 
   /**
    * The number of failures during rebalance pipeline.
    * @return
    */
   long getRebalanceFailureCounter();
+
+  /**
+   * @return number of all partitions in this cluster
+   */
+  long getTotalPartitionCount();
+
+  /**
+   * @return number of all partitions in this cluster that have errors
+   */
+  long getTotalErrorPartitionCount();
+
+  /**
+   * @return number of all partitions in this cluster without any top-state replicas
+   */
+  long getTotalPartitionsWithoutTopStateCount();
+
+  /**
+   * @return number of all partitions in this cluster whose ExternalView and IdealState have
discrepancies
+   */
+  long getTotalExternalViewIdealStateMismatchPartitionCount();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 3d5c579..125257b 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -38,17 +38,17 @@ import java.util.*;
 public class ResourceMonitor extends DynamicMBeanProvider {
 
   // Gauges
-  private SimpleDynamicMetric<Integer> _numOfPartitions;
-  private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView;
-  private SimpleDynamicMetric<Integer> _numOfErrorPartitions;
-  private SimpleDynamicMetric<Integer> _numNonTopStatePartitions;
+  private SimpleDynamicMetric<Long> _numOfPartitions;
+  private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
+  private SimpleDynamicMetric<Long> _numOfErrorPartitions;
+  private SimpleDynamicMetric<Long> _numNonTopStatePartitions;
+  private SimpleDynamicMetric<Long> _externalViewIdealStateDiff;
   private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions;
   private SimpleDynamicMetric<Long> _numLessReplicaPartitions;
   private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions;
   private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions;
   private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions;
   private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions;
-  private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff;
 
   // Counters
   private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
@@ -63,6 +63,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private final String _resourceName;
   private final String _clusterName;
   private final ObjectName _initObjectName;
+  private ClusterStatusMonitor _clusterStatusMonitor;
 
   @Override
   public ResourceMonitor register() throws JMException {
@@ -85,16 +86,37 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_partitionTopStateHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
     doRegister(attributeList, _initObjectName);
-
     return this;
   }
 
+  @Override
+  public synchronized void unregister() {
+    super.unregister();
+    // Also remove metrics propagated to aggregate metrics in ClusterStatusMonitor
+    if (_clusterStatusMonitor != null) {
+      _clusterStatusMonitor.applyDeltaToTotalPartitionCount(-_numOfPartitions.getValue());
+      _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(-_numOfErrorPartitions.getValue());
+      _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
+          -_externalViewIdealStateDiff.getValue());
+      _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(-_numNonTopStatePartitions.getValue());
+    }
+  }
+
   public enum MonitorState {
     TOP_STATE
   }
 
-  public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName)
-      throws JMException {
+  public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName)
{
+    this(null, clusterName, resourceName, objectName);
+  }
+
+  public ResourceMonitor(ClusterStatusMonitor clusterStatusMonitor, String clusterName, String
resourceName,
+      ObjectName objectName) {
+    if (clusterStatusMonitor == null) {
+      _logger.warn("ResourceMonitor initialized without a reference to ClusterStatusMonitor
(null): metrics will not "
+          + "be aggregated at the cluster level.");
+    }
+    _clusterStatusMonitor = clusterStatusMonitor;
     _clusterName = clusterName;
     _resourceName = resourceName;
     _initObjectName = objectName;
@@ -130,9 +152,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   @Override
   public String getSensorName() {
-    return String
-        .format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag,
-            _resourceName);
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
_tag, _resourceName);
   }
 
   public long getPartitionGauge() {
@@ -199,30 +219,29 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     }
 
     resetGauges();
+
     if (idealState == null) {
-      _logger.warn("ideal state is null for " + _resourceName);
+      _logger.warn("ideal state is null for {}", _resourceName);
       return;
     }
 
     assert (_resourceName.equals(idealState.getId()));
     assert (_resourceName.equals(externalView.getId()));
 
-    int numOfErrorPartitions = 0;
-    int numOfDiff = 0;
-    int numOfPartitionWithTopState = 0;
+    long numOfErrorPartitions = 0;
+    long numOfDiff = 0;
+    long numOfPartitionWithTopState = 0;
 
     Set<String> partitions = idealState.getPartitionSet();
-
-    _numOfPartitions.updateValue(partitions.size());
-
     int replica;
     try {
       replica = Integer.valueOf(idealState.getReplicas());
     } catch (NumberFormatException e) {
-      _logger.info("Unspecified replica count for " + _resourceName + ", skip updating the
ResourceMonitor Mbean: " + idealState.getReplicas());
+      _logger.info("Unspecified replica count for {}, skip updating the ResourceMonitor Mbean:
{}", _resourceName,
+          idealState.getReplicas());
       return;
     } catch (Exception ex) {
-      _logger.warn("Failed to get replica count for " + _resourceName + ", cannot update
the ResourceMonitor Mbean.");
+      _logger.warn("Failed to get replica count for {}, cannot update the ResourceMonitor
Mbean.", _resourceName);
       return;
     }
 
@@ -273,9 +292,23 @@ public class ResourceMonitor extends DynamicMBeanProvider {
             .updateValue(_numLessMinActiveReplicaPartitions.getValue() + 1);
       }
     }
+
+    // Update cluster-level aggregate metrics in ClusterStatusMonitor
+    if (_clusterStatusMonitor != null) {
+      _clusterStatusMonitor.applyDeltaToTotalPartitionCount(partitions.size() - _numOfPartitions.getValue());
+      _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(
+          numOfErrorPartitions - _numOfErrorPartitions.getValue());
+      _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
+          numOfDiff - _externalViewIdealStateDiff.getValue());
+      _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(
+          (partitions.size() - numOfPartitionWithTopState) - _numNonTopStatePartitions.getValue());
+    }
+
+    // Update resource-level metrics
+    _numOfPartitions.updateValue((long) partitions.size());
     _numOfErrorPartitions.updateValue(numOfErrorPartitions);
     _externalViewIdealStateDiff.updateValue(numOfDiff);
-    _numOfPartitionsInExternalView.updateValue(externalView.getPartitionSet().size());
+    _numOfPartitionsInExternalView.updateValue((long) externalView.getPartitionSet().size());
     _numNonTopStatePartitions.updateValue(_numOfPartitions.getValue() - numOfPartitionWithTopState);
 
     String tag = idealState.getInstanceGroupTag();
@@ -287,10 +320,15 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   }
 
   private void resetGauges() {
-    _numOfErrorPartitions.updateValue(0);
-    _numNonTopStatePartitions.updateValue(0);
-    _externalViewIdealStateDiff.updateValue(0);
-    _numOfPartitionsInExternalView.updateValue(0);
+    // Disable reset for the following gauges:
+    // 1) Need the previous values for these gauges to compute delta for cluster-level metrics.
+    // 2) These four gauges are reset every time updateResource is called anyway.
+    //_numOfErrorPartitions.updateValue(0l);
+    //_numNonTopStatePartitions.updateValue(0l);
+    //_externalViewIdealStateDiff.updateValue(0l);
+    //_numOfPartitionsInExternalView.updateValue(0l);
+
+    // The following gauges are computed each call to updateResource by way of looping so
need to be reset.
     _numLessMinActiveReplicaPartitions.updateValue(0l);
     _numLessReplicaPartitions.updateValue(0l);
     _numPendingRecoveryRebalancePartitions.updateValue(0l);
@@ -301,23 +339,23 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded)
{
     switch (monitorState) {
-    case TOP_STATE:
-      if (succeeded) {
-        _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue()
+ 1);
-        _successfulTopStateHandoffDurationCounter
-            .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration);
-        _partitionTopStateHandoffDurationGauge.updateValue(duration);
-        if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
-          _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
-          _lastResetTime = System.currentTimeMillis();
+      case TOP_STATE:
+        if (succeeded) {
+          _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue()
+ 1);
+          _successfulTopStateHandoffDurationCounter
+              .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration);
+          _partitionTopStateHandoffDurationGauge.updateValue(duration);
+          if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
+            _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
+            _lastResetTime = System.currentTimeMillis();
+          }
+        } else {
+          _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue()
+ 1);
         }
-      } else {
-        _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue()
+ 1);
-      }
-      break;
-    default:
-      _logger.warn(
-          String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
+        break;
+      default:
+        _logger.warn(
+            String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index b44c63c..988ba9b 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -134,7 +134,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   public abstract DynamicMBeanProvider register() throws JMException;
 
   /**
-   * After unregistered, the MBean can't be registered again, a new monitor has be to created.
+   * After unregistered, the MBean can't be registered again, a new monitor has to be created.
    */
   public synchronized void unregister() {
     MBeanRegistrar.unregister(_objectName);
@@ -176,7 +176,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   @Override
   public void setAttribute(Attribute attribute)
       throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException,
-      ReflectionException {
+             ReflectionException {
     // All MBeans are readonly
     return;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
new file mode 100644
index 0000000..ba3b654
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
@@ -0,0 +1,214 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.Query;
+import javax.management.QueryExp;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * This test specifically tests MBean metrics instrumented in ClusterStatusMonitor that aggregate
individual
+ * resource-level metrics into cluster-level figures.
+ *
+ * Sets up 3 Participants and 5 partitions with 3 replicas each, the test monitors the change
in the numbers
+ * when a Participant is disabled.
+ *
+ */
+public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
+
+  // Configurable values for test setup
+  private static final int NUM_PARTICIPANTS = 3;
+  private static final int NUM_PARTITIONS = 5;
+  private static final int NUM_REPLICAS = 3;
+
+  private static final String PARTITION_COUNT = "TotalPartitionCount";
+  private static final String ERROR_PARTITION_COUNT = "TotalErrorPartitionCount";
+  private static final String WITHOUT_TOPSTATE_COUNT = "TotalPartitionsWithoutTopStateCount";
+  private static final String IS_EV_MISMATCH_COUNT = "TotalExternalViewIdealStateMismatchPartitionCount";
+
+  private static final int START_PORT = 12918;
+  private static final String STATE_MODEL = "MasterSlave";
+  private static final String TEST_DB = "TestDB";
+  private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+  private final String CLASS_NAME = getShortClassName();
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private ClusterSetup _setupTool;
+  private HelixManager _manager;
+  private MockParticipantManager[] _participants = new MockParticipantManager[NUM_PARTICIPANTS];
+  private ClusterControllerManager _controller;
+  private Map<String, Object> _beanValueMap = new HashMap<>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, NUM_PARTITIONS, STATE_MODEL);
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, NUM_REPLICAS);
+
+    // start dummy participants
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    // create cluster manager
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+  }
+
+  /**
+   * Shutdown order: 1) disconnect the controller 2) disconnect participants.
+   *
+   */
+  @AfterClass
+  public void afterClass() {
+    if (_controller != null && _controller.isConnected()) {
+      _controller.syncStop();
+    }
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].syncStop();
+      }
+    }
+    if (_manager != null && _manager.isConnected()) {
+      _manager.disconnect();
+    }
+
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testAggregateMetrics() throws InterruptedException {
+    // Everything should be up and running initially with 5 total partitions
+    updateMetrics();
+    Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);
+    Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 0L);
+
+    // Disable all Participants (instances)
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
+    }
+    Thread.sleep(500);
+    updateMetrics();
+    Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);
+    Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 5L);
+    Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 5L);
+
+    // Re-enable all Participants (instances)
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, true);
+    }
+    Thread.sleep(500);
+    updateMetrics();
+    Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);
+    Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 0L);
+
+    // Drop the resource and check that all metrics are zero.
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
+    Thread.sleep(500);
+    updateMetrics();
+    Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 0L);
+    Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 0L);
+  }
+
+  /**
+   * Queries for all MBeans from the MBean Server and only looks at the relevant MBean and
gets its metric numbers.
+   *
+   */
+  private void updateMetrics() {
+    try {
+      QueryExp exp = Query.match(Query.attr("SensorName"), Query.value("*" + CLUSTER_NAME
+ "*"));
+      Set<ObjectInstance> mbeans =
+          new HashSet<>(ManagementFactory.getPlatformMBeanServer().queryMBeans(new
ObjectName("ClusterStatus:*"), exp));
+      for (ObjectInstance instance : mbeans) {
+        ObjectName beanName = instance.getObjectName();
+        if (beanName.toString().equals("ClusterStatus:cluster=" + CLUSTER_NAME)) {
+          MBeanInfo info = _server.getMBeanInfo(beanName);
+          MBeanAttributeInfo[] infos = info.getAttributes();
+          for (MBeanAttributeInfo infoItem : infos) {
+            Object val = _server.getAttribute(beanName, infoItem.getName());
+            _beanValueMap.put(infoItem.getName(), val);
+          }
+        }
+      }
+    } catch (Exception e) {
+      // update failed
+    }
+  }
+}


Mime
View raw message