helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 42/44: Refactor StateTransitionStatMonitor extends DynamicMbean
Date Sat, 25 May 2019 01:20:16 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 88e2188f7e8af41a0b4d2329006beb0e9230a817
Author: Junkai Xue <jxue@linkedin.com>
AuthorDate: Wed May 15 08:28:16 2019 -0700

    Refactor StateTransitionStatMonitor extends DynamicMbean
    
    To support per state transition latency, the first step is to change the StateTransitionStatMonitor
to DynamicMbean.
    
    RB=1671496
    
    RB=1671496
    
    RB=1671496
    BUG=HELIX-1890
    G=helix-reviewers
    A=hulee
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 .../apache/helix/monitoring/mbeans/JobMonitor.java |  17 ---
 .../mbeans/ParticipantStatusMonitor.java           |   5 +-
 .../mbeans/StateTransitionStatMonitor.java         | 169 +++++++--------------
 .../mbeans/dynamicMBeans/DynamicMBeanProvider.java |  16 ++
 .../helix/monitoring/TestParticipantMonitor.java   |   8 +-
 5 files changed, 78 insertions(+), 137 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 6589e96..0c4aad4 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -204,21 +204,4 @@ public class JobMonitor extends DynamicMBeanProvider {
     doRegister(attributeList, _initObjectName);
     return this;
   }
-
-  /**
-   * NOTE: This method is not thread-safe nor atomic.
-   * Increment the value of a given SimpleDynamicMetric by 1.
-   */
-  private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) {
-    metric.updateValue(metric.getValue() + 1);
-  }
-
-  /**
-   * NOTE: This method is not thread-safe nor atomic.
-   * Increment the value of a given SimpleDynamicMetric by 1.
-   */
-  private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long
value) {
-    metric.updateValue(metric.getValue() + value);
-  }
-
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index 9eb29ff..f2fe72d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -101,10 +101,9 @@ public class ParticipantStatusMonitor {
         synchronized (this) {
           if (!_monitorMap.containsKey(cxt)) {
             StateTransitionStatMonitor bean =
-                new StateTransitionStatMonitor(cxt);
+                new StateTransitionStatMonitor(cxt, getObjectName(cxt.toString()));
             _monitorMap.put(cxt, bean);
-            String beanName = cxt.toString();
-            register(bean, getObjectName(beanName));
+            bean.register();
           }
         }
       }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
index b1e93e6..93ace95 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -19,37 +19,70 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
 import org.apache.helix.monitoring.StatCollector;
 import org.apache.helix.monitoring.StateTransitionContext;
 import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 // TODO convert StateTransitionStatMonitor to extends DynamicMBeanProvider.
 // Note this might change the attributes name.
-public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean {
+public class StateTransitionStatMonitor extends DynamicMBeanProvider {
   private static final Logger _logger = LoggerFactory.getLogger(StateTransitionStatMonitor.class);
-  public enum LATENCY_TYPE {
-    TOTAL,
-    EXECUTION,
-    MESSAGE
-  }
+  private List<DynamicMetric<?, ?>> _attributeList;
+  // For registering dynamic metrics
+  private final ObjectName _initObjectName;
 
-  private long _numDataPoints;
-  private long _successCount;
+  private SimpleDynamicMetric<Long> _totalStateTransitionCounter;
+  private SimpleDynamicMetric<Long> _totalFailedTransitionCounter;
+  private SimpleDynamicMetric<Long> _totalSuccessTransitionCounter;
 
-  private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap = new ConcurrentHashMap<>();
+  private HistogramDynamicMetric _transitionLatencyGauge;
+  private HistogramDynamicMetric _transitionExecutionLatencyGauge;
+  private HistogramDynamicMetric _transitionMessageLatency;
 
   StateTransitionContext _context;
 
-  public StateTransitionStatMonitor(StateTransitionContext context) {
+  public StateTransitionStatMonitor(StateTransitionContext context, ObjectName objectName)
{
     _context = context;
-    for (LATENCY_TYPE type : LATENCY_TYPE.values()) {
-      _monitorMap.put(type, new StatCollector());
-    }
-    reset();
+    _initObjectName = objectName;
+    _attributeList = new ArrayList<>();
+    _totalStateTransitionCounter = new SimpleDynamicMetric<>("TotalStateTransitionCounter",
0L);
+    _totalFailedTransitionCounter = new SimpleDynamicMetric<>("TotalFailedTransitionCounter",
0L);
+    _totalSuccessTransitionCounter = new SimpleDynamicMetric<>("TotalSuccessTransitionCounter",
0L);
+
+    _transitionLatencyGauge = new HistogramDynamicMetric("TransitionLatencyGauge", new Histogram(
+        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _transitionExecutionLatencyGauge = new HistogramDynamicMetric("TransitionExecutionLatencyGauge",
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _transitionMessageLatency = new HistogramDynamicMetric("TransitionMessageLatencyGauge",
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    _attributeList.add(_totalStateTransitionCounter);
+    _attributeList.add(_totalFailedTransitionCounter);
+    _attributeList.add(_totalSuccessTransitionCounter);
+    _attributeList.add(_transitionLatencyGauge);
+    _attributeList.add(_transitionExecutionLatencyGauge);
+    _attributeList.add(_transitionMessageLatency);
+    doRegister(_attributeList, _initObjectName);
+    return this;
   }
 
   public StateTransitionContext getContext() {
@@ -62,105 +95,15 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe
   }
 
   public void addDataPoint(StateTransitionDataPoint data) {
-    _numDataPoints++;
+    incrementSimpleDynamicMetric(_totalStateTransitionCounter);
     if (data.getSuccess()) {
-      _successCount++;
+      incrementSimpleDynamicMetric(_totalSuccessTransitionCounter);
+    } else {
+      incrementSimpleDynamicMetric(_totalFailedTransitionCounter);
     }
-    addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay());
-    addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay());
-    addLatency(LATENCY_TYPE.MESSAGE, data.getMessageLatency());
-  }
 
-  private void addLatency(LATENCY_TYPE type, double latency) {
-    if (latency < 0) {
-      _logger.warn("Ignore negative latency data {} for type {}.", latency, type.name());
-      return;
-    }
-    assert(_monitorMap.containsKey(type));
-    _monitorMap.get(type).addData(latency);
-  }
-
-  public void reset() {
-    _numDataPoints = 0;
-    _successCount = 0;
-    for (StatCollector monitor : _monitorMap.values()) {
-      monitor.reset();
-    }
-  }
-
-  @Override
-  public long getTotalStateTransitionGauge() {
-    return _numDataPoints;
-  }
-
-  @Override
-  public long getTotalFailedTransitionGauge() {
-    return _numDataPoints - _successCount;
-  }
-
-  @Override
-  public long getTotalSuccessTransitionGauge() {
-    return _successCount;
-  }
-
-  @Override
-  public double getMeanTransitionLatency() {
-    return _monitorMap.get(LATENCY_TYPE.TOTAL).getMean();
-  }
-
-  @Override
-  public double getMaxTransitionLatency() {
-    return _monitorMap.get(LATENCY_TYPE.TOTAL).getMax();
-  }
-
-  @Override
-  public double getMinTransitionLatency() {
-    return _monitorMap.get(LATENCY_TYPE.TOTAL).getMin();
+    _transitionLatencyGauge.updateValue(data.getTotalDelay());
+    _transitionExecutionLatencyGauge.updateValue(data.getExecutionDelay());
+    _transitionMessageLatency.updateValue(data.getMessageLatency());
   }
-
-  @Override
-  public double getPercentileTransitionLatency(int percentage) {
-    return _monitorMap.get(LATENCY_TYPE.TOTAL).getPercentile(percentage);
-  }
-
-  @Override
-  public double getMeanTransitionExecuteLatency() {
-    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMean();
-  }
-
-  @Override
-  public double getMaxTransitionExecuteLatency() {
-    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMax();
-  }
-
-  @Override
-  public double getMinTransitionExecuteLatency() {
-    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getMin();
-  }
-
-  @Override
-  public double getPercentileTransitionExecuteLatency(int percentage) {
-    return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage);
-  }
-
-  @Override
-  public double getMeanTransitionMessageLatency() {
-    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMean();
-  }
-
-  @Override
-  public double getMaxTransitionMessageLatency() {
-    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMax();
-  }
-
-  @Override
-  public double getMinTransitionMessageLatency() {
-    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMin();
-  }
-
-  @Override
-  public double getPercentileTransitionMessageLatency(int percentage) {
-    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getPercentile(percentage);
-  }
-
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 4299159..59adf67 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -210,4 +210,20 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
     // No operation supported
     return null;
   }
+
+  /**
+   * NOTE: This method is not thread-safe nor atomic.
+   * Increment the value of a given SimpleDynamicMetric by 1.
+   */
+  protected void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) {
+    incrementSimpleDynamicMetric(metric, 1);
+  }
+
+  /**
+   * NOTE: This method is not thread-safe nor atomic.
+   * Increment the value of a given SimpleDynamicMetric with input value.
+   */
+  protected void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long
value) {
+    metric.updateValue(metric.getValue() + value);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index d7aed6e..bc9de01 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -124,13 +124,13 @@ public class TestParticipantMonitor {
 
     // Note the values in listener's map is the snapshot when the MBean is detected.
     Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
-        .get("MeanTransitionLatency"), 2000.0);
+        .get("TransitionLatencyGauge.Mean"), 2000.0);
     Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
-        .get("MeanTransitionExecuteLatency"), 1100.0);
+        .get("TransitionExecutionLatencyGauge.Mean"), 1100.0);
     Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
-        .get("MeanTransitionMessageLatency"), 600.0);
+        .get("TransitionMessageLatencyGauge.Mean"), 600.0);
     Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
-        .get("TotalStateTransitionGauge"), 2L);
+        .get("TotalStateTransitionCounter"), 2L);
 
     data = new StateTransitionDataPoint(2000, 500, 600, true);
     monitor.reportTransitionStat(cxt, data);


Mime
View raw message