helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhangm...@apache.org
Subject [helix] branch master updated: Change participant message monitor from static metric to dynamic metric (#1696)
Date Fri, 16 Apr 2021 16:08:03 GMT
This is an automated email from the ASF dual-hosted git repository.

zhangmeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 24c3c24  Change participant message monitor from static metric to dynamic metric
(#1696)
24c3c24 is described below

commit 24c3c24b35ce39784343573c9561e3d808adc1c6
Author: Meng Zhang <mnzhang@linkedin.com>
AuthorDate: Fri Apr 16 09:07:52 2021 -0700

    Change participant message monitor from static metric to dynamic metric (#1696)
    
    Co-authored-by: Meng Zhang <mnzhang@mnzhang-mn2.linkedin.biz>
---
 .../monitoring/mbeans/MessageLatencyMonitor.java   |   2 +-
 .../mbeans/ParticipantMessageMonitor.java          |  97 ++++++++++---------
 .../mbeans/ParticipantMessageMonitorMBean.java     |  31 ------
 .../mbeans/ParticipantStatusMonitor.java           |  62 +++++-------
 .../helix/monitoring/TestParticipantMonitor.java   | 107 ++++++++++++++++++---
 5 files changed, 172 insertions(+), 127 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index 9f2e41c..c180953 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -74,7 +74,7 @@ public class MessageLatencyMonitor extends DynamicMBeanProvider {
     attributeList.add(_totalMessageCount);
     attributeList.add(_totalMessageLatency);
     attributeList.add(_messageLatencyGauge);
-    doRegister(attributeList, MBEAN_DESCRIPTION, _domainName, ParticipantMessageMonitor.PARTICIPANT_KEY,
+    doRegister(attributeList, MBEAN_DESCRIPTION, _domainName, ParticipantStatusMonitor.PARTICIPANT_KEY,
         _participantName, "MonitorType", MessageLatencyMonitor.class.getSimpleName());
 
     return this;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index 261790d..83aa3e2 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -19,10 +19,27 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean {
-  public static final String PARTICIPANT_KEY = "ParticipantName";
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.JMException;
+
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+public class ParticipantMessageMonitor extends DynamicMBeanProvider {
+  private static final String MBEAN_DESCRIPTION = "Helix Participant Message Monitor";
+  private final String _domainName;
   public static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus";
 
+  private final String _participantName;
+
+  private SimpleDynamicMetric<Long> _receivedMessages;
+  private SimpleDynamicMetric<Long> _discardedMessages;
+  private SimpleDynamicMetric<Long> _completedMessages;
+  private SimpleDynamicMetric<Long> _failedMessages;
+  private SimpleDynamicMetric<Long> _pendingMessages;
+
   /**
    * The current processed state of the message
    */
@@ -32,68 +49,38 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean
     COMPLETED
   }
 
-  private final String _participantName;
-  private long _receivedMessages = 0;
-  private long _discardedMessages = 0;
-  private long _completedMessages = 0;
-  private long _failedMessages = 0;
-  private long _pendingMessages = 0;
-
-  public ParticipantMessageMonitor(String participantName) {
+  public ParticipantMessageMonitor(String domainName, String participantName) {
+    _domainName = domainName;
     _participantName = participantName;
+    _receivedMessages = new SimpleDynamicMetric("ReceivedMessages", 0L);
+    _discardedMessages = new SimpleDynamicMetric("DiscardedMessages", 0L);
+    _completedMessages = new SimpleDynamicMetric("CompletedMessages", 0L);
+    _failedMessages = new SimpleDynamicMetric("FailedMessages", 0L);
+    _pendingMessages = new SimpleDynamicMetric("PendingMessages", 0L);
   }
 
-  public String getParticipantBeanName() {
-    return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
-  }
-
-  public void incrementReceivedMessages(int count) {
-    _receivedMessages += count;
+  public void incrementReceivedMessages(long count) {
+    incrementSimpleDynamicMetric(_receivedMessages, count);
   }
 
   public void incrementDiscardedMessages(int count) {
-    _discardedMessages += count;
+    incrementSimpleDynamicMetric(_discardedMessages, count);
   }
 
   public void incrementCompletedMessages(int count) {
-    _completedMessages += count;
+    incrementSimpleDynamicMetric(_completedMessages, count);
   }
 
   public void incrementFailedMessages(int count) {
-    _failedMessages += count;
+    incrementSimpleDynamicMetric(_failedMessages, count);
   }
 
   public void incrementPendingMessages(int count) {
-    _pendingMessages += count;
+    incrementSimpleDynamicMetric(_pendingMessages, count);
   }
 
   public void decrementPendingMessages(int count) {
-    _pendingMessages -= count;
-  }
-
-  @Override
-  public long getReceivedMessages() {
-    return _receivedMessages;
-  }
-
-  @Override
-  public long getDiscardedMessages() {
-    return _discardedMessages;
-  }
-
-  @Override
-  public long getCompletedMessages() {
-    return _completedMessages;
-  }
-
-  @Override
-  public long getFailedMessages() {
-    return _failedMessages;
-  }
-
-  @Override
-  public long getPendingMessages() {
-    return _pendingMessages;
+    incrementSimpleDynamicMetric(_pendingMessages, -1 * count);
   }
 
   @Override
@@ -101,4 +88,22 @@ public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean
     return PARTICIPANT_STATUS_KEY;
   }
 
+  /**
+   * This method registers the dynamic metrics.
+   * @return
+   * @throws JMException
+   */
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_receivedMessages);
+    attributeList.add(_discardedMessages);
+    attributeList.add(_completedMessages);
+    attributeList.add(_failedMessages);
+    attributeList.add(_pendingMessages);
+    doRegister(attributeList, MBEAN_DESCRIPTION, _domainName,
+        ParticipantStatusMonitor.PARTICIPANT_KEY, _participantName, "MonitorType",
+        ParticipantMessageMonitor.class.getSimpleName());
+    return this;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
deleted file mode 100644
index d4a899f..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-
-public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
-  public long getReceivedMessages();
-  public long getDiscardedMessages();
-  public long getCompletedMessages();
-  public long getFailedMessages();
-  public long getPendingMessages();
-}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index 6e5b346..14db1c7 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -36,25 +36,32 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ParticipantStatusMonitor {
+  public static final String PARTICIPANT_KEY = "ParticipantName";
+
   private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>
_monitorMap =
       new ConcurrentHashMap<>();
   private static final Logger LOG = LoggerFactory.getLogger(ParticipantStatusMonitor.class);
 
+  private final String _instanceName;
   private MBeanServer _beanServer;
   private ParticipantMessageMonitor _messageMonitor;
   private MessageLatencyMonitor _messageLatencyMonitor;
   private Map<String, ThreadPoolExecutorMonitor> _executorMonitors;
 
   public ParticipantStatusMonitor(boolean isParticipant, String instanceName) {
+    _instanceName = instanceName;
     try {
       _beanServer = ManagementFactory.getPlatformMBeanServer();
       if (isParticipant) {
-        _messageMonitor = new ParticipantMessageMonitor(instanceName);
+        _messageMonitor =
+            new ParticipantMessageMonitor(MonitorDomainNames.CLMParticipantReport.name(),
+                _instanceName);
+        _messageMonitor.register();
         _messageLatencyMonitor =
-            new MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), instanceName);
+            new MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(),
+                _instanceName);
         _messageLatencyMonitor.register();
         _executorMonitors = new ConcurrentHashMap<>();
-        register(_messageMonitor, getObjectName(_messageMonitor.getParticipantBeanName()));
       }
     } catch (Exception e) {
       LOG.warn(e.toString());
@@ -115,42 +122,26 @@ public class ParticipantStatusMonitor {
   }
 
   private ObjectName getObjectName(String name) throws MalformedObjectNameException {
-    return new ObjectName(String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(),
name));
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name));
   }
 
-  private void register(Object bean, ObjectName name) {
-    LOG.info("Registering bean: " + name.toString());
-    if (_beanServer == null) {
-      LOG.warn("bean server is null, skip reporting");
-      return;
-    }
-    try {
-      _beanServer.unregisterMBean(name);
-    } catch (Exception e1) {
-      // Swallow silently
-    }
-
-    try {
-      _beanServer.registerMBean(bean, name);
-    } catch (Exception e) {
-      LOG.warn("Could not register MBean", e);
-    }
+  /**
+   * Build participant bean name
+   * @param participantName
+   * @return participant bean name
+   */
+  protected String getParticipantBeanName(String participantName) {
+    return String.format("%s=%s", PARTICIPANT_KEY, participantName);
   }
 
   public void shutDown() {
-    if (_messageMonitor != null) {  // is participant
-      try {
-        ObjectName name = getObjectName(_messageMonitor.getParticipantBeanName());
-        if (_beanServer.isRegistered(name)) {
-          _beanServer.unregisterMBean(name);
-        }
-      } catch (Exception e) {
-        LOG.warn("fail to unregister " + _messageMonitor.getParticipantBeanName(), e);
-      }
-    }
     if (_messageLatencyMonitor != null) {
       _messageLatencyMonitor.unregister();
     }
+    if (_messageMonitor != null) {
+      _messageMonitor.unregister();
+    }
     for (StateTransitionContext cxt : _monitorMap.keySet()) {
       try {
         ObjectName name = getObjectName(cxt.toString());
@@ -168,16 +159,15 @@ public class ParticipantStatusMonitor {
     if (_executorMonitors == null) {
       return;
     }
-    if (! (executor instanceof ThreadPoolExecutor)) {
+    if (!(executor instanceof ThreadPoolExecutor)) {
       return;
     }
 
     try {
-      _executorMonitors.put(type,
-          new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor));
+      _executorMonitors
+          .put(type, new ThreadPoolExecutorMonitor(type, (ThreadPoolExecutor) executor));
     } catch (JMException e) {
-      LOG.warn(String.format(
-          "Error in creating ThreadPoolExecutorMonitor for type=%s", type), e);
+      LOG.warn(String.format("Error in creating ThreadPoolExecutorMonitor for type=%s", type),
e);
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index f137df3..b761573 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -23,18 +23,23 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServerConnection;
 import javax.management.MBeanServerNotification;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
 
 import org.apache.helix.TestHelper;
+import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,19 +49,22 @@ import org.testng.annotations.Test;
 public class TestParticipantMonitor {
   private static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class);
   private static String CLUSTER_NAME = TestHelper.getTestClassName();
+  private static final String PARTICIPANT_NAME = "participant_0";
+  private static final String DOMAIN_NAME = "CLMParticipantReport";
 
   class ParticipantMonitorListener extends ClusterMBeanObserver {
     Map<String, Map<String, Object>> _beanValueMap = new HashMap<>();
 
-    public ParticipantMonitorListener(String domain) throws IOException, InstanceNotFoundException
{
+    public ParticipantMonitorListener(String domain, String key, String value)
+        throws IOException, InstanceNotFoundException {
       super(domain);
-      init();
+      init(key, value);
     }
 
-    void init() {
+    void init(String key, String value) {
       try {
         Set<ObjectInstance> existingInstances =
-            _server.queryMBeans(new ObjectName(_domain + ":Cluster=" + CLUSTER_NAME + ",*"),
null);
+            _server.queryMBeans(new ObjectName(_domain + ":" + key + "=" + value + ",*"),
null);
         for (ObjectInstance instance : existingInstances) {
           String mbeanName = instance.getObjectName().toString();
           // System.out.println("mbeanName: " + mbeanName);
@@ -102,15 +110,17 @@ public class TestParticipantMonitor {
   }
 
   @Test()
-  public void testReportData()
+  public void testReportStateTransitionData()
       throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
-      IOException, InterruptedException {
-    System.out.println("START TestParticipantMonitor");
+             IOException, InterruptedException, MBeanException, AttributeNotFoundException,
+             ReflectionException {
+    System.out.println("START TestParticipantStateTransitionMonitor");
     ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(false, null);
 
     int monitorNum = 0;
 
-    StateTransitionContext cxt = new StateTransitionContext(CLUSTER_NAME, "instance", "db_1",
"a-b");
+    StateTransitionContext cxt =
+        new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b");
     StateTransitionDataPoint data = new StateTransitionDataPoint(2000, 1000, 600, true);
     monitor.reportTransitionStat(cxt, data);
 
@@ -118,7 +128,7 @@ public class TestParticipantMonitor {
     monitor.reportTransitionStat(cxt, data);
 
     ParticipantMonitorListener monitorListener =
-        new ParticipantMonitorListener("CLMParticipantReport");
+        new ParticipantMonitorListener(DOMAIN_NAME, "Cluster", CLUSTER_NAME);
     Thread.sleep(1000);
     Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
 
@@ -138,7 +148,8 @@ public class TestParticipantMonitor {
     Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
 
     data = new StateTransitionDataPoint(1000, 500, 300, true);
-    StateTransitionContext cxt2 = new StateTransitionContext(CLUSTER_NAME, "instance", "db_2",
"a-b");
+    StateTransitionContext cxt2 =
+        new StateTransitionContext(CLUSTER_NAME, "instance", "db_2", "a-b");
     monitor.reportTransitionStat(cxt2, data);
     monitor.reportTransitionStat(cxt2, data);
     Thread.sleep(1000);
@@ -147,12 +158,13 @@ public class TestParticipantMonitor {
 
     Assert.assertTrue(cxt.equals(cxt2));
     Assert.assertFalse(cxt.equals(new Object()));
-    Assert.assertTrue(cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance", "db_1",
"a-b")));
+    Assert.assertTrue(
+        cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b")));
 
     cxt2.getInstanceName();
 
     ParticipantMonitorListener monitorListener2 =
-        new ParticipantMonitorListener("CLMParticipantReport");
+        new ParticipantMonitorListener(DOMAIN_NAME, "Cluster", CLUSTER_NAME);
 
     Thread.sleep(1000);
     // Same here. Helix only measures per cluster + per state transitions.
@@ -160,6 +172,75 @@ public class TestParticipantMonitor {
 
     monitorListener2.disconnect();
     monitorListener.disconnect();
-    System.out.println("END TestParticipantMonitor");
+
+    System.out.println("END TestParticipantStateTransitionMonitor");
+  }
+
+  @Test()
+  public void testReportMessageData()
+      throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
+             IOException, InterruptedException, MBeanException, AttributeNotFoundException,
+             ReflectionException {
+    System.out.println("START TestParticipantMessageMonitor");
+    ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(true, PARTICIPANT_NAME);
+
+    Message message = new Message(Message.MessageType.NO_OP, "0");
+    monitor.reportReceivedMessage(message);
+    Thread.sleep(1000);
+    ParticipantMonitorListener monitorListener =
+        new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName", PARTICIPANT_NAME);
+    Thread.sleep(1000);
+    Assert.assertEquals(monitorListener._beanValueMap.size(), 2);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("ReceivedMessages"), 1L);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("PendingMessages"), 1L);
+
+    monitor
+        .reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
+    Thread.sleep(1000);
+    monitorListener =
+        new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName", PARTICIPANT_NAME);
+    Thread.sleep(1000);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("ReceivedMessages"), 1L);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("PendingMessages"), 0L);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("CompletedMessages"), 1L);
+
+    monitor.reportReceivedMessage(message);
+    Thread.sleep(1000);
+    monitorListener =
+        new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName", PARTICIPANT_NAME);
+    Thread.sleep(1000);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("ReceivedMessages"), 2L);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("PendingMessages"), 1L);
+
+    monitor
+        .reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+    Thread.sleep(1000);
+    monitorListener =
+        new ParticipantMonitorListener(DOMAIN_NAME, "ParticipantName", PARTICIPANT_NAME);
+    Thread.sleep(1000);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("DiscardedMessages"), 1L);
+    Assert.assertEquals(monitorListener._beanValueMap.get(
+        getObjectName("ParticipantName=participant_0,MonitorType=ParticipantMessageMonitor")
+            .toString()).get("PendingMessages"), 0L);
+
+    monitorListener.disconnect();
+
+    System.out.println("END TestParticipantMessageMonitor");
   }
 }

Mime
View raw message