helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [12/17] git commit: [HELIX-463] Create useful message queue length jmx
Date Fri, 11 Jul 2014 19:58:07 GMT
[HELIX-463] Create useful message queue length jmx


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2ccaa6b4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2ccaa6b4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2ccaa6b4

Branch: refs/heads/master
Commit: 2ccaa6b434e48415cc56e6e9dbb663a5a289e960
Parents: 0ee1cdc
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Jun 25 17:42:45 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Jul 11 12:49:15 2014 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 23 ++++-
 .../monitoring/mbeans/ClusterStatusMonitor.java | 47 +++-------
 .../mbeans/ClusterStatusMonitorMBean.java       |  6 +-
 .../monitoring/mbeans/MessageQueueMonitor.java  | 94 +++++++++++++++++---
 .../mbeans/MessageQueueMonitorMBean.java        | 11 +--
 .../TestClusterStatusMonitorLifecycle.java      | 26 +++---
 6 files changed, 128 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2ccaa6b4/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index b507755..c4d7a67 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -47,16 +47,15 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
-import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.monitoring.ParticipantMonitor;
+import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
@@ -104,6 +103,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   private final ParticipantMonitor _monitor;
   public static final String MAX_THREADS = "maxThreads";
 
+  private MessageQueueMonitor _messageQueueMonitor;
+
   /**
    * Map of MsgType->MsgHandlerFactoryRegistryItem
    */
@@ -441,6 +442,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
 
   void reset() {
     LOG.info("Reset HelixTaskExecutor");
+
+    if (_messageQueueMonitor != null) {
+      _messageQueueMonitor.reset();
+    }
+
     for (String msgType : _hdlrFtyRegistry.keySet()) {
       // don't un-register factories, just shutdown all executors
       ExecutorService pool = _executorMap.remove(msgType);
@@ -466,6 +472,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   void init() {
     LOG.info("Init HelixTaskExecutor");
 
+    if (_messageQueueMonitor != null) {
+      _messageQueueMonitor.init();
+    }
+
     // Re-init all existing factories
     for (String msgType : _hdlrFtyRegistry.keySet()) {
       MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
@@ -484,6 +494,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
 
+    HelixManager manager = changeContext.getManager();
+    if (_messageQueueMonitor == null) {
+      _messageQueueMonitor =
+          new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName());
+    }
+
     // If FINALIZE notification comes, reset all handler factories
     // and terminate all the thread pools
     // TODO: see if we should have a separate notification call for resetting
@@ -505,7 +521,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
     // sort message by creation timestamp, so message created earlier is processed first
     Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
 
-    HelixManager manager = changeContext.getManager();
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/2ccaa6b4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 1cce342..9ea51b1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.management.MBeanServer;
@@ -48,6 +47,7 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
@@ -60,6 +60,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   static final String CLUSTER_DN_KEY = "cluster";
   static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
+  static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
 
   public static final String DEFAULT_TAG = "DEFAULT";
 
@@ -70,13 +71,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   private Set<String> _instances = Collections.emptySet();
   private Set<String> _disabledInstances = Collections.emptySet();
   private Map<String, Set<String>> _disabledPartitions = Collections.emptyMap();
+  private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
 
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
       new ConcurrentHashMap<String, ResourceMonitor>();
 
-  private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap
=
-      new ConcurrentHashMap<String, MessageQueueMonitor>();
-
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
       new ConcurrentHashMap<String, InstanceMonitor>();
 
@@ -133,24 +132,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   @Override
   public long getMaxMessageQueueSizeGauge() {
     long maxQueueSize = 0;
-    for (MessageQueueMonitor msgQueue : _instanceMsgQueueMbeanMap.values()) {
-      if (msgQueue.getMaxMessageQueueSize() > maxQueueSize) {
-        maxQueueSize = (long) msgQueue.getMaxMessageQueueSize();
+    for (Long queueSize : _instanceMsgQueueSizes.values()) {
+      if (queueSize > maxQueueSize) {
+        maxQueueSize = queueSize;
       }
     }
-
     return maxQueueSize;
   }
 
   @Override
-  public String getMessageQueueSizes() {
-    Map<String, Long> msgQueueSizes = new TreeMap<String, Long>();
-    for (String instance : _instanceMsgQueueMbeanMap.keySet()) {
-      MessageQueueMonitor msgQueue = _instanceMsgQueueMbeanMap.get(instance);
-      msgQueueSizes.put(instance, new Long((long) msgQueue.getMaxMessageQueueSize()));
+  public long getInstanceMessageQueueBacklog() {
+    long sum = 0;
+    for (Long queueSize : _instanceMsgQueueSizes.values()) {
+      sum += queueSize;
     }
-
-    return msgQueueSizes.toString();
+    return sum;
   }
 
   private void register(Object bean, ObjectName name) {
@@ -349,20 +345,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     }
   }
 
-  public void addMessageQueueSize(String instanceName, int msgQueueSize) {
-    try {
-      if (!_instanceMsgQueueMbeanMap.containsKey(instanceName)) {
-        synchronized (this) {
-          if (!_instanceMsgQueueMbeanMap.containsKey(instanceName)) {
-            MessageQueueMonitor bean = new MessageQueueMonitor(_clusterName, instanceName);
-            _instanceMsgQueueMbeanMap.put(instanceName, bean);
-          }
-        }
-      }
-      _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
-    } catch (Exception e) {
-      LOG.error("Fail to add message queue size to mbean, instance: " + instanceName, e);
-    }
+  public void addMessageQueueSize(String instanceName, long msgQueueSize) {
+    _instanceMsgQueueSizes.put(instanceName, msgQueueSize);
   }
 
   public void reset() {
@@ -372,10 +356,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
       _resourceMbeanMap.clear();
 
-      for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
-        bean.reset();
-      }
-      _instanceMsgQueueMbeanMap.clear();
+      _instanceMsgQueueSizes.clear();
 
       unregisterInstances(_instanceMbeanMap.keySet());
       _instanceMbeanMap.clear();

http://git-wip-us.apache.org/repos/asf/helix/blob/2ccaa6b4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 9070aaa..9d15af9 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -32,15 +32,13 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider
{
 
   /**
    * The max message queue size across all instances including controller
-   * will report to ingraph
    * @return
    */
   public long getMaxMessageQueueSizeGauge();
 
   /**
-   * Get all message queue sizes as a string
-   * will NOT report to ingraph
+   * The sum of all message queue sizes for instances in this cluster
    * @return
    */
-  public String getMessageQueueSizes();
+  public long getInstanceMessageQueueBacklog();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2ccaa6b4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
index 6b8b9e3..1ebf851 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
@@ -19,42 +19,108 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import org.apache.helix.monitoring.StatCollector;
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.log4j.Logger;
 
 public class MessageQueueMonitor implements MessageQueueMonitorMBean {
   private static final Logger LOG = Logger.getLogger(MessageQueueMonitor.class);
 
-  private final StatCollector _messageQueueSizeStat;
   private final String _clusterName;
   private final String _instanceName;
+  private final MBeanServer _beanServer;
+  private long _messageQueueBacklog;
 
   public MessageQueueMonitor(String clusterName, String instanceName) {
     _clusterName = clusterName;
     _instanceName = instanceName;
-    _messageQueueSizeStat = new StatCollector();
+    _beanServer = ManagementFactory.getPlatformMBeanServer();
+    _messageQueueBacklog = 0;
   }
 
-  public void addMessageQueueSize(long size) {
-    _messageQueueSizeStat.addData(size);
+  /**
+   * Set the current backlog size for this instance
+   * @param size the message queue size
+   */
+  public void setMessageQueueBacklog(long size) {
+    _messageQueueBacklog = size;
   }
 
-  public void reset() {
-    _messageQueueSizeStat.reset();
+  @Override
+  public long getMessageQueueBacklog() {
+    return _messageQueueBacklog;
   }
 
-  @Override
-  public double getMaxMessageQueueSize() {
-    return _messageQueueSizeStat.getMax();
+  /**
+   * Register this bean with the server
+   */
+  public void init() {
+    try {
+      register(this, getObjectName(getBeanName()));
+    } catch (Exception e) {
+      LOG.error("Fail to register MessageQueueMonitor", e);
+    }
   }
 
-  @Override
-  public double getMeanMessageQueueSize() {
-    return _messageQueueSizeStat.getMean();
+  /**
+   * Remove this bean from the server
+   */
+  public void reset() {
+    _messageQueueBacklog = 0;
+    try {
+      unregister(getObjectName(getBeanName()));
+    } catch (Exception e) {
+      LOG.error("Fail to register MessageQueueMonitor", e);
+    }
   }
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "." + _clusterName + "." + _instanceName;
+    return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "." + _clusterName;
+  }
+
+  private void register(Object bean, ObjectName name) {
+    try {
+      if (_beanServer.isRegistered(name)) {
+        _beanServer.unregisterMBean(name);
+      }
+    } catch (Exception e) {
+      // OK
+    }
+
+    try {
+      LOG.info("Register MBean: " + name);
+      _beanServer.registerMBean(bean, name);
+    } catch (Exception e) {
+      LOG.warn("Could not register MBean: " + name, e);
+    }
+  }
+
+  private void unregister(ObjectName name) {
+    try {
+      if (_beanServer.isRegistered(name)) {
+        LOG.info("Unregistering " + name.toString());
+        _beanServer.unregisterMBean(name);
+      }
+    } catch (Exception e) {
+      LOG.warn("Could not unregister MBean: " + name, e);
+    }
+  }
+
+  private String getClusterBeanName() {
+    return String.format("%s=%s", ClusterStatusMonitor.CLUSTER_DN_KEY, _clusterName);
+  }
+
+  private String getBeanName() {
+    return String.format("%s,%s=%s", getClusterBeanName(),
+        ClusterStatusMonitor.MESSAGE_QUEUE_DN_KEY, _instanceName);
+  }
+
+  public ObjectName getObjectName(String name) throws MalformedObjectNameException {
+    return new ObjectName(String.format("%s: %s", ClusterStatusMonitor.CLUSTER_STATUS_KEY,
name));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2ccaa6b4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
index acf3824..14c0af5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java
@@ -23,15 +23,8 @@ import org.apache.helix.monitoring.SensorNameProvider;
 
 public interface MessageQueueMonitorMBean extends SensorNameProvider {
   /**
-   * Get the max message queue size
+   * Get the message queue size
    * @return
    */
-  public double getMaxMessageQueueSize();
-
-  /**
-   * Get the mean message queue size
-   * @return
-   */
-  public double getMeanMessageQueueSize();
-
+  public long getMessageQueueBacklog();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2ccaa6b4/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 0981a2e..b062282 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -150,10 +150,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase
{
   public void afterClass() {
     System.out.println("Cleaning up...");
     for (int i = 0; i < 5; i++) {
-      boolean result =
-          ClusterStateVerifier
-              .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                  _controllerClusterName));
       _controllers[i].syncStop();
     }
 
@@ -202,10 +198,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase
{
 
     // 1 participant goes away
     // No change in instance/resource mbean
-    // Unregister 1 per-instance resource mbean
+    // Unregister 1 per-instance resource mbean and message queue mbean
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 2);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -223,8 +219,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase
{
     // 1 cluster status monitor, 1 resource monitor, 5 instances
     // Unregister 1+4+1 per-instance resource mbean
     // Register 4 per-instance resource mbean
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 16);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 12);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
@@ -234,8 +230,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase
{
     // No change in instance/resource mbean
     // Register 1 per-instance resource mbean
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 16);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 14);
 
     // Add a resource
     // Register 1 resource mbean
@@ -249,16 +245,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase
{
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 16);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 20);
 
     // Remove a resource
     // No change in instance/resource mbean
     // Unregister 5 per-instance resource mbean
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 21);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 20);
 
   }
 }


Mime
View raw message