helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [25/38] helix git commit: Add a messaging monitoring mbean to ParticipantStatusMonitor object.
Date Wed, 08 Feb 2017 18:00:00 GMT
Add a messaging monitoring mbean to ParticipantStatusMonitor object.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/092b73a7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/092b73a7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/092b73a7

Branch: refs/heads/helix-0.6.x
Commit: 092b73a753cdbf632d06a938519388e6825c088c
Parents: cddd9a6
Author: Boyan Li <boli@linkedin.com>
Authored: Wed Sep 14 15:29:54 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Wed Feb 8 09:53:12 2017 -0800

----------------------------------------------------------------------
 .../messaging/DefaultMessagingService.java      | 12 +++++++-
 .../messaging/handling/HelixTaskExecutor.java   | 13 ++++----
 .../monitoring/ParticipantStatusMonitor.java    | 24 ++++++++++++++-
 .../mbeans/ParticipantMessageMonitor.java       | 32 ++++++++++++++++++++
 .../mbeans/ParticipantMessageMonitorMBean.java  |  8 +++++
 .../src/test/java/org/apache/helix/Mocks.java   |  1 -
 .../controller/stages/DummyClusterManager.java  |  1 -
 .../monitoring/TestParticipantMonitor.java      |  2 +-
 .../helix/participant/MockZKHelixManager.java   |  1 -
 9 files changed, 81 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index f000f69..42764f3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -41,6 +41,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.monitoring.ParticipantStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class DefaultMessagingService implements ClusterMessagingService {
@@ -49,6 +50,8 @@ public class DefaultMessagingService implements ClusterMessagingService
{
   private final HelixTaskExecutor _taskExecutor;
   // TODO:rename to factory, this is not a service
   private final AsyncCallbackService _asyncCallbackService;
+  private final ParticipantStatusMonitor _participantStatusMonitor;
+
   private static Logger _logger = Logger.getLogger(DefaultMessagingService.class);
   ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
=
       new ConcurrentHashMap<String, MessageHandlerFactory>();
@@ -56,7 +59,14 @@ public class DefaultMessagingService implements ClusterMessagingService
{
   public DefaultMessagingService(HelixManager manager) {
     _manager = manager;
     _evaluator = new CriteriaEvaluator();
-    _taskExecutor = new HelixTaskExecutor(this);
+
+    boolean isParticipant = false;
+    if (manager.getInstanceType() == InstanceType.PARTICIPANT || manager.getInstanceType()
== InstanceType.CONTROLLER_PARTICIPANT) {
+      isParticipant = true;
+    }
+    _participantStatusMonitor = new ParticipantStatusMonitor(isParticipant, manager.getInstanceName());
+
+    _taskExecutor = new HelixTaskExecutor(_participantStatusMonitor);
     _asyncCallbackService = new AsyncCallbackService();
     _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
         _asyncCallbackService);

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 188992b..ea3646d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -109,7 +109,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   public static final String MAX_THREADS = "maxThreads";
 
   private MessageQueueMonitor _messageQueueMonitor;
-  private ClusterMessagingService _messagingService;
   private GenericHelixController _controller;
   private Long _lastSessionSyncTime;
   private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
@@ -128,6 +127,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   final Timer _timer;
 
   public HelixTaskExecutor() {
+    this(new ParticipantStatusMonitor(false, null));
+  }
+
+  public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) {
     _taskMap = new ConcurrentHashMap<String, MessageTaskInfo>();
 
     _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
@@ -137,18 +140,13 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
 
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
-    _monitor = new ParticipantStatusMonitor();
+    _monitor = participantStatusMonitor;
 
     _timer = new Timer(true); // created as a daemon timer thread to handle task timeout
 
     startMonitorThread();
   }
 
-  public HelixTaskExecutor(ClusterMessagingService messagingService) {
-    this();
-    _messagingService = messagingService;
-  }
-
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
     registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
@@ -597,6 +595,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
 
     // Update message count
     _messageQueueMonitor.setMessageQueueBacklog(messages.size());
+    _monitor.incrementReceivedMessages(messages.size());
 
     // sort message by creation timestamp, so message created earlier is processed first
     Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
index e2e0082..31e8fb6 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ParticipantStatusMonitor.java
@@ -27,6 +27,7 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.monitoring.mbeans.StateTransitionStatMonitor;
 import org.apache.log4j.Logger;
 
@@ -36,10 +37,15 @@ public class ParticipantStatusMonitor {
   private static final Logger LOG = Logger.getLogger(ParticipantStatusMonitor.class);
 
   private MBeanServer _beanServer;
+  private ParticipantMessageMonitor _messageMonitor;
 
-  public ParticipantStatusMonitor() {
+  public ParticipantStatusMonitor(boolean isParticipant, String instanceName) {
     try {
       _beanServer = ManagementFactory.getPlatformMBeanServer();
+      if (isParticipant) {
+        _messageMonitor = new ParticipantMessageMonitor(instanceName);
+        register(_messageMonitor, getObjectName(_messageMonitor.getParticipantBeanName()));
+      }
     } catch (Exception e) {
       LOG.warn(e);
       e.printStackTrace();
@@ -47,6 +53,12 @@ public class ParticipantStatusMonitor {
     }
   }
 
+  public void incrementReceivedMessages(int num) {
+    if (_messageMonitor != null) {  // is participant
+      _messageMonitor.incrementReceivedMessages(num);
+    }
+  }
+
   public void reportTransitionStat(StateTransitionContext cxt, StateTransitionDataPoint data)
{
     if (_beanServer == null) {
       LOG.warn("bean server is null, skip reporting");
@@ -95,6 +107,16 @@ public class ParticipantStatusMonitor {
   }
 
   public void shutDown() {
+    if (_messageMonitor != null) {  // is participant
+      try {
+        ObjectName name = getObjectName(_messageMonitor.getParticipantBeanName());
+        if (_beanServer.isRegistered(name)) {
+          _beanServer.unregisterMBean(name);
+        }
+      } catch (Exception e) {
+        LOG.warn("fail to unregister " + _messageMonitor.getParticipantBeanName(), e);
+      }
+    }
     for (StateTransitionContext cxt : _monitorMap.keySet()) {
       try {
         ObjectName name = getObjectName(cxt.toString());

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
new file mode 100644
index 0000000..d7d6fab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -0,0 +1,32 @@
+package org.apache.helix.monitoring.mbeans;
+
+public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean {
+
+  private static final String PARTICIPANT_KEY = "ParticipantName";
+  private static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus";
+  private final String _participantName;
+  private long _receivedMessages = 0;
+
+  public ParticipantMessageMonitor(String participantName) {
+    _participantName = participantName;
+  }
+
+  public String getParticipantBeanName() {
+    return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
+  }
+
+  public void incrementReceivedMessages(int count) {
+    _receivedMessages+=count;
+  }
+
+  @Override
+  public long getReceivedMessages() {
+    return _receivedMessages;
+  }
+
+  @Override
+  public String getSensorName() {
+    return PARTICIPANT_STATUS_KEY + "." + "_participantName";
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
new file mode 100644
index 0000000..cfbc755
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
@@ -0,0 +1,8 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+
+public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
+  public long getReceivedMessages();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 9688c0d..c60a693 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -31,7 +31,6 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 0167487..3e832cb 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -39,7 +39,6 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.participant.StateMachineEngine;

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index 0191eaa..a01cd36 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -100,7 +100,7 @@ public class TestParticipantMonitor {
   public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException,
       NullPointerException, IOException, InterruptedException {
     System.out.println("START TestParticipantMonitor");
-    ParticipantStatusMonitor monitor = new ParticipantStatusMonitor();
+    ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(false, null);
 
     int monitorNum = 0;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/092b73a7/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index db6974e..3d3fdd5 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -41,7 +41,6 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;


Mime
View raw message