helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 32/44: Fix critical Task Framework throttle bug
Date Sat, 25 May 2019 01:20:06 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit cd821cd620b6821cbe3d4ccf7072efeb3b924d32
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu May 9 15:59:59 2019 -0700

    Fix critical Task Framework throttle bug
    
    Task throttling feature had a logical bug where it wouldn't count any of the pending task
assignments, which was breaking task throttling. This diff fixes it.
    
    RB=1661127
    BUG=HELIX-1875
    G=helix-reviewers
    A=jjwang
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 .../org/apache/helix/ClusterMessagingService.java  | 17 +++----
 .../stages/CurrentStateComputationStage.java       |  5 ++
 .../controller/stages/CurrentStateOutput.java      | 35 +++++++++-----
 .../integration/manager/TestZkHelixAdmin.java      | 44 +++++++++++++++++-
 .../spectator/TestRoutingTableSnapshot.java        |  3 +-
 .../helix/manager/zk/TestZNRecordSizeLimit.java    | 53 +++++++++++-----------
 .../TestClusterStatusMonitorLifecycle.java         | 49 ++++----------------
 7 files changed, 117 insertions(+), 89 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
index 28188e0..96a5957 100644
--- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -37,7 +37,7 @@ import org.apache.helix.model.Message;
 public interface ClusterMessagingService {
   /**
    * Send message matching the specifications mentioned in recipientCriteria.
-   * @param recipientCriteria criteria to be met, defined as {@link Criteria}
+   * @param receipientCriteria criteria to be met, defined as {@link Criteria}
    * @See Criteria
    * @param message
    *          message to be sent. Some attributes of this message will be
@@ -55,24 +55,24 @@ public interface ClusterMessagingService {
    * This is useful when message need to be sent and current thread need not
    * wait for response since processing will be done in another thread.
    * @see #send(Criteria, Message)
-   * @param recipientCriteria
+   * @param receipientCriteria
    * @param message
    * @param callbackOnReply callback to trigger on completion
    * @param timeOut Time to wait before failing the send
    * @return the number of messages that were successfully sent
    */
-  int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply, int
timeOut);
+  int send(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply, int
timeOut);
 
   /**
    * @see #send(Criteria, Message, AsyncCallback, int)
-   * @param recipientCriteria
+   * @param receipientCriteria
    * @param message
    * @param callbackOnReply
    * @param timeOut
    * @param retryCount maximum number of times to retry the send
    * @return the number of messages that were successfully sent
    */
-  int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply,
+  int send(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply,
       int timeOut, int retryCount);
 
   /**
@@ -86,13 +86,14 @@ public interface ClusterMessagingService {
    * The current thread can use callbackOnReply instance to store application
    * specific data.
    * @see #send(Criteria, Message, AsyncCallback, int)
-   * @param recipientCriteria
+   * @param receipientCriteria
    * @param message
    * @param callbackOnReply
    * @param timeOut
+   * @param retryCount
    * @return the number of messages that were successfully sent
    */
-  int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply,
+  int sendAndWait(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply,
       int timeOut);
 
   /**
@@ -143,7 +144,7 @@ public interface ClusterMessagingService {
   /**
    * This will generate all messages to be sent given the recipientCriteria and MessageTemplate,
    * the messages are not sent.
-   * @param recipientCriteria criteria to be met, defined as {@link Criteria}
+   * @param receipientCriteria criteria to be met, defined as {@link Criteria}
    * @param messageTemplate the Message on which to base the messages to send
    * @return messages to be sent, grouped by the type of instance to send the message to
    */
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 72d3688..0bf4d28 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -121,6 +121,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           }
         }
       }
+
+      // Add the state model into the map for lookup of Task Framework pending partitions
+      if (resource.getStateModelDefRef() != null) {
+        currentStateOutput.setResourceStateModelDef(resourceName, resource.getStateModelDefRef());
+      }
     }
 
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index b634703..13e1dbf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -110,8 +110,8 @@ public class CurrentStateOutput {
     _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
   }
 
-  public void setEndTime(String resourceName, Partition partition,
-      String instanceName, Long timestamp) {
+  public void setEndTime(String resourceName, Partition partition, String instanceName,
+      Long timestamp) {
     if (!_currentStateEndTimeMap.containsKey(resourceName)) {
       _currentStateEndTimeMap.put(resourceName, new HashMap<Partition, Map<String,
Long>>());
     }
@@ -193,8 +193,7 @@ public class CurrentStateOutput {
     return null;
   }
 
-  public Long getEndTime(String resourceName, Partition partition,
-      String instanceName) {
+  public Long getEndTime(String resourceName, Partition partition, String instanceName) {
     Map<Partition, Map<String, Long>> partitionInfo = _currentStateEndTimeMap.get(resourceName);
     if (partitionInfo != null) {
       Map<String, Long> instanceInfo = partitionInfo.get(partition);
@@ -279,7 +278,7 @@ public class CurrentStateOutput {
    */
   public Map<Partition, Map<String, String>> getCurrentStateMap(String resourceName)
{
     if (_currentStateMap.containsKey(resourceName)) {
-      return  _currentStateMap.get(resourceName);
+      return _currentStateMap.get(resourceName);
     }
     return Collections.emptyMap();
   }
@@ -356,32 +355,43 @@ public class CurrentStateOutput {
   }
 
   /**
-   * Get the partitions count for each participant with the pending state and given resource
state model
+   * Get the partitions count for each participant with the pending state and given resource
state
+   * model
    * @param resourceStateModel specified resource state model to look up
    * @param state specified pending resource state to look up
    * @return set of participants to partitions mapping
    */
-  public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel,
String state) {
+  public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel,
+      String state) {
     return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingMessageMap);
   }
 
   /**
-   * Get the partitions count for each participant in the current state and with given resource
state model
+   * Get the partitions count for each participant in the current state and with given resource
+   * state model
    * @param resourceStateModel specified resource state model to look up
    * @param state specified current resource state to look up
    * @return set of participants to partitions mapping
    */
-  public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel,
String state) {
+  public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel,
+      String state) {
     return getPartitionCountWithState(resourceStateModel, state, (Map) _currentStateMap);
   }
 
+  /**
+   * Count partitions in pendingStates and currentStates.
+   * @param resourceStateModel
+   * @param state
+   * @param stateMap
+   * @return
+   */
   private Map<String, Integer> getPartitionCountWithState(String resourceStateModel,
String state,
       Map<String, Map<Partition, Map<String, Object>>> stateMap) {
     Map<String, Integer> currentPartitionCount = new HashMap<>();
     for (String resource : stateMap.keySet()) {
       String stateModel = _resourceStateModelMap.get(resource);
-      if ((stateModel != null && stateModel.equals(resourceStateModel)) || (stateModel
== null
-          && resourceStateModel == null)) {
+      if ((stateModel != null && stateModel.equals(resourceStateModel))
+          || (stateModel == null && resourceStateModel == null)) {
         for (Partition partition : stateMap.get(resource).keySet()) {
           Map<String, Object> partitionMessage = stateMap.get(resource).get(partition);
           for (Map.Entry<String, Object> participantMap : partitionMessage.entrySet())
{
@@ -399,7 +409,8 @@ public class CurrentStateOutput {
                 currState = curStateObj.toString();
               }
             }
-            if ((currState != null && currState.equals(state)) || (currState == null
&& state == null)) {
+            if ((currState != null && currState.equals(state))
+                || (currState == null && state == null)) {
               currentPartitionCount.put(participant, currentPartitionCount.get(participant)
+ 1);
             }
           }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 5141a8d..0dfdfb4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -19,16 +19,20 @@ package org.apache.helix.integration.manager;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.integration.task.MockTask;
 import org.apache.helix.integration.task.TaskTestBase;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -57,6 +61,26 @@ public class TestZkHelixAdmin extends TaskTestBase {
   }
 
   @Test
+  public void testViewClusterOperations() {
+    String testCluster = "testViewCluster";
+    List<ViewClusterSourceConfig> sourceConfigs = generateViewClusterSourceConfig();
+    int refreshPeriod = 10;
+
+    _admin.addCluster(testCluster);
+    ClusterConfig config = _configAccessor.getClusterConfig(testCluster);
+    config.setViewCluster();
+    config.setViewClusterRefreshPeriod(refreshPeriod);
+    config.setViewClusterSourceConfigs(sourceConfigs);
+    _configAccessor.setClusterConfig(testCluster, config);
+
+    ClusterConfig fetchedConfig = _configAccessor.getClusterConfig(testCluster);
+    Assert.assertTrue(fetchedConfig.isViewCluster());
+    Assert.assertEquals(fetchedConfig.getViewClusterSourceConfigs().size(), sourceConfigs.size());
+    Assert.assertEquals(fetchedConfig.getViewClusterRefershPeriod(), refreshPeriod);
+    _admin.dropCluster(testCluster);
+  }
+
+  @Test
   public void testEnableDisablePartitions() throws InterruptedException {
     _admin.enablePartition(false, CLUSTER_NAME, (PARTICIPANT_PREFIX + "_" + _startPort),
         WorkflowGenerator.DEFAULT_TGT_DB, Arrays.asList(new String[] { "TestDB_0", "TestDB_2"
}));
@@ -88,4 +112,22 @@ public class TestZkHelixAdmin extends TaskTestBase {
     Assert.assertEquals(jobContext.getPartitionState(1), TaskPartitionState.COMPLETED);
     Assert.assertEquals(jobContext.getPartitionState(2), null);
   }
-}
\ No newline at end of file
+
+  private List<ViewClusterSourceConfig> generateViewClusterSourceConfig() {
+    String clusterNamePrefix = "mySourceCluster";
+    String zkConnection = "zookeeper.test.com:2121";
+    String testJsonTemplate =
+        "{\"name\": \"%s\", \"zkAddress\": \"%s\", \"properties\": [\"%s\", \"%s\", \"%s\"]}";
+
+    List<ViewClusterSourceConfig> sourceConfigs = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      String clusterName = clusterNamePrefix + i;
+      String configJSON = String
+          .format(testJsonTemplate, clusterName, zkConnection, PropertyType.INSTANCES.name(),
+              PropertyType.EXTERNALVIEW.name(), PropertyType.LIVEINSTANCES.name());
+
+      sourceConfigs.add(ViewClusterSourceConfig.fromJson(configJSON));
+    }
+    return sourceConfigs;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
index 3b498c3..216c900 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
@@ -132,4 +132,5 @@ public class TestRoutingTableSnapshot extends ZkTestBase {
       Assert.assertEquals(slaveInsEv.size(), 2);
     }
   }
-}
\ No newline at end of file
+}
+
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index bccb425..36e26e7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -81,23 +81,25 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     _gZkClient.createPersistent(path2, true);
     try {
       _gZkClient.writeData(path2, largeRecord);
+      Assert.fail("Should fail because data size is larger than 1M");
     } catch (HelixException e) {
-      Assert.fail("Should not fail because data size is larger than 1M since compression
applied");
+      // OK
     }
     record = _gZkClient.readData(path2);
-    Assert.assertNotNull(record);
+    Assert.assertNull(record);
 
     // oversized write doesn't overwrite existing data on zk
     record = _gZkClient.readData(path1);
     try {
       _gZkClient.writeData(path1, largeRecord);
+      Assert.fail("Should fail because data size is larger than 1M");
     } catch (HelixException e) {
-      Assert.fail("Should not fail because data size is larger than 1M since compression
applied");
+      // OK
     }
     ZNRecord recordNew = _gZkClient.readData(path1);
     byte[] arr = serializer.serialize(record);
     byte[] arrNew = serializer.serialize(recordNew);
-    Assert.assertFalse(Arrays.equals(arr, arrNew));
+    Assert.assertTrue(Arrays.equals(arr, arrNew));
 
     // test ZkDataAccessor
     ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
@@ -119,7 +121,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       idealState.getRecord().setSimpleField(i + "", bufStr);
     }
     boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-    Assert.assertTrue(succeed);
+    Assert.assertFalse(succeed);
     HelixProperty property =
         accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918", "session_1",
             "partition_1"));
@@ -149,11 +151,11 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
     }
     // System.out.println("record: " + idealState.getRecord());
     succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
-    Assert.assertTrue(succeed);
+    Assert.assertFalse(succeed);
     recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
     arr = serializer.serialize(record);
     arrNew = serializer.serialize(recordNew);
-    Assert.assertFalse(Arrays.equals(arr, arrNew));
+    Assert.assertTrue(Arrays.equals(arr, arrNew));
 
     System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
         + new Date(System.currentTimeMillis()));
@@ -162,12 +164,12 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
   @Test
   public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
     String className = getShortClassName();
-    System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " +
new Date(
-        System.currentTimeMillis()));
+    System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+        + new Date(System.currentTimeMillis()));
 
     ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
-    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+    HelixZkClient zkClient =
+        SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
 
     try {
       zkClient.setZkSerializer(serializer);
@@ -205,25 +207,25 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       zkClient.createPersistent(path2, true);
       try {
         zkClient.writeData(path2, largeRecord);
+        Assert.fail("Should fail because data size is larger than 1M");
       } catch (HelixException e) {
-        Assert
-            .fail("Should not fail because data size is larger than 1M since compression
applied");
+        // OK
       }
       record = zkClient.readData(path2);
-      Assert.assertNotNull(record);
+      Assert.assertNull(record);
 
       // oversized write doesn't overwrite existing data on zk
       record = zkClient.readData(path1);
       try {
         zkClient.writeData(path1, largeRecord);
+        Assert.fail("Should fail because data size is larger than 1M");
       } catch (HelixException e) {
-        Assert
-            .fail("Should not fail because data size is larger than 1M since compression
applied");
+        // OK
       }
       ZNRecord recordNew = zkClient.readData(path1);
       byte[] arr = serializer.serialize(record);
       byte[] arrNew = serializer.serialize(recordNew);
-      Assert.assertFalse(Arrays.equals(arr, arrNew));
+      Assert.assertTrue(Arrays.equals(arr, arrNew));
 
       // test ZkDataAccessor
       ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
@@ -232,8 +234,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       admin.addInstance(className, instanceConfig);
 
       // oversized data should not create any new data on zk
-      ZKHelixDataAccessor accessor =
-          new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
+      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
       Builder keyBuilder = accessor.keyBuilder();
 
       // ZNRecord statusUpdates = new ZNRecord("statusUpdates");
@@ -246,9 +247,9 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
         idealState.getRecord().setSimpleField(i + "", bufStr);
       }
       boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_1"), idealState);
-      Assert.assertTrue(succeed);
+      Assert.assertFalse(succeed);
       HelixProperty property = accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
-      Assert.assertNotNull(property);
+      Assert.assertNull(property);
 
       // legal sized data gets written to zk
       idealState.getRecord().getSimpleFields().clear();
@@ -275,18 +276,16 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
       }
       // System.out.println("record: " + idealState.getRecord());
       succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB_2"), idealState);
-      Assert.assertTrue(succeed);
+      Assert.assertFalse(succeed);
       recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
       arr = serializer.serialize(record);
       arrNew = serializer.serialize(recordNew);
-      Assert.assertFalse(Arrays.equals(arr, arrNew));
-    } catch (HelixException ex) {
-      Assert.fail("Should not fail because data size is larger than 1M since compression
applied");
+      Assert.assertTrue(Arrays.equals(arr, arrNew));
     } finally {
       zkClient.close();
     }
 
-    System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new
Date(
-        System.currentTimeMillis()));
+    System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
+        + new Date(System.currentTimeMillis()));
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 6156666..f84faf5 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -19,26 +19,21 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.Query;
 import javax.management.QueryExp;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -67,8 +62,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     String className = TestHelper.getTestClassName();
     _clusterNamePrefix = className;
 
-    System.out
-        .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
+    System.out.println("START " + _clusterNamePrefix + " at "
+        + new Date(System.currentTimeMillis()));
 
     // setup 10 clusters
     for (int i = 0; i < clusterNb; i++) {
@@ -97,7 +92,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         clusterNb, // partitions per resource
         n, // number of nodes
         3, // replicas
-        "LeaderStandby", true); // do rebalance
+        "LeaderStandby",
+        true); // do rebalance
 
     // start distributed cluster controllers
     _controllers = new ClusterDistributedController[n + n];
@@ -177,32 +173,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
   }
 
-  class ParticipantMonitorListener extends ClusterMBeanObserver {
-
-    int _nMbeansUnregistered = 0;
-    int _nMbeansRegistered = 0;
-
-    public ParticipantMonitorListener(String domain)
-        throws InstanceNotFoundException, IOException, MalformedObjectNameException,
-        NullPointerException {
-      super(domain);
-    }
-
-    @Override
-    public void onMBeanRegistered(MBeanServerConnection server,
-        MBeanServerNotification mbsNotification) {
-      LOG.info("Register mbean: " + mbsNotification.getMBeanName());
-      _nMbeansRegistered++;
-    }
-
-    @Override
-    public void onMBeanUnRegistered(MBeanServerConnection server,
-        MBeanServerNotification mbsNotification) {
-      LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
-      _nMbeansUnregistered++;
-    }
-  }
-
   private void cleanupControllers() {
     for (int i = 0; i < _controllers.length; i++) {
       if (_controllers[i] != null && _controllers[i].isConnected()) {
@@ -326,7 +296,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     cleanupControllers();
     // Check if any MBeans leftover.
     // Note that MessageQueueStatus is not bound with controller only. So it will still exist.
-
     final QueryExp exp2 = Query.and(
         Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))),
         exp1);


Mime
View raw message