helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 07/10: HELIX: Bypass throttling for disabled partitions
Date Thu, 28 Mar 2019 19:31:51 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ddfc933e1469906d49fcccb08a66352d4b77ccd0
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:29:16 2019 -0700

    HELIX: Bypass throttling for disabled partitions
    
        This diff allows all state transitions linked to disabled instances/partitions to
bypass throttling constraints.
        Changelist:
        1. Modify logic in IntermediateStateCalcStage
        2. Add more integration tests
---
 .../stages/IntermediateStateCalcStage.java         |  80 ++++++++----
 .../TestNoThrottleDisabledPartitions.java          | 144 ++++++++++++++++++++-
 2 files changed, 199 insertions(+), 25 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index f021f66..888bd12 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -362,13 +362,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
 
     chargePendingTransition(resource, currentStateOutput, throttleController,
-        partitionsNeedRecovery, partitionsNeedLoadBalance);
+        partitionsNeedRecovery, partitionsNeedLoadBalance, cache);
 
     // Perform recovery balance
     Set<Partition> recoveryThrottledPartitions =
         recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController,
             intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput,
-            cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
+            cache.getStateModelDef(resource.getStateModelDefRef()).getTopState(), cache);
 
     // Perform load balance upon checking conditions below
     Set<Partition> loadbalanceThrottledPartitions;
@@ -398,7 +398,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput,
         bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap,
         partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName),
-        onlyDownwardLoadBalance, stateModelDef);
+        onlyDownwardLoadBalance, stateModelDef, cache);
 
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(),
@@ -461,7 +461,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
       StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance) {
+      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache)
{
     String resourceName = resource.getResourceName();
 
     // check and charge pending transitions
@@ -481,17 +481,22 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       }
 
       if (pendingMap.size() > 0) {
-        throttleController.chargeCluster(rebalanceType);
-        throttleController.chargeResource(rebalanceType, resourceName);
-
-        // charge each instance.
+        boolean shouldChargePartition = false;
         for (String instance : pendingMap.keySet()) {
           String currentState = currentStateMap.get(instance);
           String pendingState = pendingMap.get(instance);
-          if (pendingState != null && !pendingState.equals(currentState)) {
+          if (pendingState != null && !pendingState.equals(currentState)
+              && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+                  .contains(instance)) {
+            // Only charge this instance if the partition is not disabled
             throttleController.chargeInstance(rebalanceType, instance);
+            shouldChargePartition = true;
           }
         }
+        if (shouldChargePartition) {
+          throttleController.chargeCluster(rebalanceType);
+          throttleController.chargeResource(rebalanceType, resourceName);
+        }
       }
     }
   }
@@ -508,13 +513,15 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    * @param partitionsNeedRecovery
    * @param currentStateOutput
    * @param topState
+   * @param cache
    * @return a set of partitions that need recovery but did not get recovered due to throttling
    */
   private Set<Partition> recoveryRebalance(Resource resource,
       PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery,
-      CurrentStateOutput currentStateOutput, String topState) {
+      CurrentStateOutput currentStateOutput, String topState,
+      ResourceControllerDataProvider cache) {
     String resourceName = resource.getResourceName();
     Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
 
@@ -540,7 +547,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     for (Partition partition : partitionsNeedRecoveryPrioritized) {
       throttleStateTransitionsForPartition(throttleController, resourceName, partition,
           currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
+          intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, cache);
     }
     LogUtil.logInfo(logger, _eventId, String.format(
         "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing
recovery"
@@ -563,6 +570,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    * @param currentStateMap
    * @param onlyDownwardLoadBalance true when only allowing downward transitions
    * @param stateModelDef for determining whether a partition's transitions are strictly
downward
+   * @param cache
    * @return
    */
   private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput,
@@ -570,7 +578,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance,
       Map<Partition, Map<String, String>> currentStateMap, boolean onlyDownwardLoadBalance,
-      StateModelDefinition stateModelDef) {
+      StateModelDefinition stateModelDef, ResourceControllerDataProvider cache) {
     String resourceName = resource.getResourceName();
     Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
 
@@ -608,7 +616,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       }
       throttleStateTransitionsForPartition(throttleController, resourceName, partition,
           currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
+          intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
     }
     LogUtil.logInfo(logger, _eventId, String.format(
         "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing"
@@ -628,12 +636,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    * @param partitionsThrottled
    * @param intermediatePartitionStateMap
    * @param rebalanceType
+   * @param cache
    */
   private void throttleStateTransitionsForPartition(
       StateTransitionThrottleController throttleController, String resourceName,
       Partition partition, CurrentStateOutput currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
-      PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType) {
+      PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
+      ResourceControllerDataProvider cache) {
 
     Map<String, String> currentStateMap =
         currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -655,7 +665,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       for (String instance : allInstances) {
         String currentState = currentStateMap.get(instance);
         String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState))
{
+        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
+            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+                .contains(instance)) {
           if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
             hasReachedThrottlingLimit = true;
             if (logger.isDebugEnabled()) {
@@ -669,24 +681,46 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       }
     }
     if (!hasReachedThrottlingLimit) {
-      // This implies that there is room for more pending states. Find
-      // instances with a replica whose current state is different from BestPossibleState
and
+      // This implies that there is room for more state transitions.
+      // Find instances with a replica whose current state is different from BestPossibleState
and
       // "charge" for it, and bestPossibleStates will become intermediate states
       intermediateMap.putAll(bestPossibleMap);
+      boolean shouldChargeForPartition = false;
       for (String instance : allInstances) {
         String currentState = currentStateMap.get(instance);
         String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState))
{
+        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
+            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+                .contains(instance)) {
           throttleController.chargeInstance(rebalanceType, instance);
+          shouldChargeForPartition = true;
         }
       }
-      throttleController.chargeCluster(rebalanceType);
-      throttleController.chargeResource(rebalanceType, resourceName);
+      if (shouldChargeForPartition) {
+        throttleController.chargeCluster(rebalanceType);
+        throttleController.chargeResource(rebalanceType, resourceName);
+      }
     } else {
-      // No more room for more pending states; current states will just become intermediate
states
+      // No more room for more state transitions; current states will just become intermediate
+      // states unless the partition is disabled
       // Add this partition to a set of throttled partitions
-      intermediateMap.putAll(currentStateMap);
-      partitionsThrottled.add(partition);
+      for (String instance : allInstances) {
+        String currentState = currentStateMap.get(instance);
+        String bestPossibleState = bestPossibleMap.get(instance);
+        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
+            && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+                .contains(instance)) {
+          // Because this partition is disabled, we allow assignment
+          intermediateMap.put(instance, bestPossibleState);
+        } else {
+          // This partition is not disabled, so it must be throttled by just passing on the
current
+          // state
+          if (currentState != null) {
+            intermediateMap.put(instance, currentState);
+          }
+          partitionsThrottled.add(partition);
+        }
+      }
     }
     intermediatePartitionStateMap.setState(partition, intermediateMap);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index 73bdcf8..fa0fe6e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -37,6 +37,7 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -169,7 +170,8 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
    * and no throttle config set for recovery balance
    * and throttle config of 1 set for load balance,
    * Instead of disabling the instance, we disable the partition in the instance config.
-   * Here, we set the recovery balance config. Then we should still see the MASTER.
+   * Here, we set the recovery balance config to 0. But we should still see the downward
transition
+   * regardless.
    * * instance 1 : S (M->S->Offline)
    * * instance 2 : M (S->M because it's in recovery)
    * * instance 3 : S
@@ -205,7 +207,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
         for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
             .entrySet()) {
           if (partitionState.getKey().equals("TestDB0_2")) {
-            Assert.assertTrue(partitionState.getValue().equals("MASTER"));
+            Assert.assertFalse(partitionState.getValue().equals("MASTER"));
           }
         }
       }
@@ -219,6 +221,85 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
     deleteCluster(_clusterName);
   }
 
+  @Test
+  public void testNoThrottleOnDisabledInstance() throws Exception {
+    int participantCount = 5;
+    setupEnvironment(participantCount);
+    setThrottleConfig();
+
+    // Disable an instance so that it will not be subject to throttling
+    PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
+    InstanceConfig instanceConfig = _accessor.getProperty(key);
+    instanceConfig.setInstanceEnabled(false);
+    _accessor.setProperty(key, instanceConfig);
+
+    // Set the state transition delay so that transitions would be processed slowly
+    DelayedTransitionBase.setDelay(1000000L);
+
+    // Resume the controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+    Thread.sleep(500L);
+
+    // Check that there are more messages on this Participant despite the throttle config
set at 1
+    Assert.assertTrue(verifyMultipleMessages(_participants[0]));
+
+    // clean up the cluster
+    controller.syncStop();
+    for (int i = 0; i < participantCount; i++) {
+      _participants[i].syncStop();
+    }
+    deleteCluster(_clusterName);
+  }
+
+  @Test
+  public void testNoThrottleOnDisabledPartition() throws Exception {
+    int participantCount = 3;
+    setupEnvironment(participantCount);
+    setThrottleConfig();
+
+    // Disable a partition so that it will not be subject to throttling
+    String partitionName = _resourceName + "0_0";
+    for (int i = 0; i < participantCount; i++) {
+      disablePartitionOnInstance(_participants[i], _resourceName + "0", partitionName);
+    }
+
+    String newResource = "abc";
+    IdealState idealState = new FullAutoModeISBuilder(newResource).setStateModel("MasterSlave")
+        .setStateModelFactoryName("DEFAULT").setNumPartitions(5).setNumReplica(3)
+        .setMinActiveReplica(2).setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO)
+        .setRebalancerClass("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer")
+        .setRebalanceStrategy(
+            "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy")
+        .build();
+
+    _gSetupTool.addResourceToCluster(_clusterName, newResource, idealState);
+    _gSetupTool.rebalanceStorageCluster(_clusterName, newResource, 3);
+
+    // Set the state transition delay so that transitions would be processed slowly
+    DelayedTransitionBase.setDelay(1000000L);
+
+    // Now Helix will try to bring this up on all instances. But the disabled partition will
go to
+    // offline. This should allow each instance to have 2 messages despite having the throttle
set
+    // at 1
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
+    Thread.sleep(500L);
+
+    for (MockParticipantManager participantManager : _participants) {
+      Assert.assertTrue(verifyTwoMessages(participantManager));
+    }
+
+    // clean up the cluster
+    controller.syncStop();
+    for (int i = 0; i < participantCount; i++) {
+      _participants[i].syncStop();
+    }
+    deleteCluster(_clusterName);
+  }
+
   /**
    * Set up the cluster and pause the controller.
    * @param participantCount
@@ -253,6 +334,37 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
   }
 
   /**
+   * Set all throttle configs at 1 so that we could test by observing the number of ongoing
+   * transitions.
+   */
+  private void setThrottleConfig() {
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+
+    ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+    clusterConfig.setResourcePriorityField("Name");
+    List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
+
+    // Add throttling at cluster-level
+    throttleConfigs.add(new StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+    throttleConfigs.add(
+        new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+    throttleConfigs
+        .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+
+    // Add throttling at instance level
+    throttleConfigs
+        .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 1));
+
+    clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+    _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+  }
+
+  /**
    * Set throttle limits only for load balance so that none of them would happen.
    */
   private void setThrottleConfigForLoadBalance() {
@@ -347,4 +459,32 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
     instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, false);
     _accessor.setProperty(key, instanceConfig);
   }
+
+  /**
+   * Ensure that there are more than 1 message for a given Participant.
+   * @param participant
+   * @return
+   */
+  private boolean verifyMultipleMessages(final MockParticipantManager participant) {
+    PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName());
+    List<String> messageNames = _accessor.getChildNames(key);
+    if (messageNames != null) {
+      return messageNames.size() > 1;
+    }
+    return false;
+  }
+
+  /**
+   * Ensure that there are 2 messages for a given Participant.
+   * @param participant
+   * @return
+   */
+  private boolean verifyTwoMessages(final MockParticipantManager participant) {
+    PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName());
+    List<String> messageNames = _accessor.getChildNames(key);
+    if (messageNames != null) {
+      return messageNames.size() == 2;
+    }
+    return false;
+  }
 }


Mime
View raw message