This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit ddfc933e1469906d49fcccb08a66352d4b77ccd0
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:29:16 2019 -0700
HELIX: Bypass throttling for disabled partitions
This diff allows all state transitions linked to disabled instances/partitions to
bypass throttling constraints.
Changelist:
1. Modify logic in IntermediateStateCalcStage
2. Add more integration tests
---
.../stages/IntermediateStateCalcStage.java | 80 ++++++++----
.../TestNoThrottleDisabledPartitions.java | 144 ++++++++++++++++++++-
2 files changed, 199 insertions(+), 25 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index f021f66..888bd12 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -362,13 +362,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
chargePendingTransition(resource, currentStateOutput, throttleController,
- partitionsNeedRecovery, partitionsNeedLoadBalance);
+ partitionsNeedRecovery, partitionsNeedLoadBalance, cache);
// Perform recovery balance
Set<Partition> recoveryThrottledPartitions =
recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController,
intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput,
- cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
+ cache.getStateModelDef(resource.getStateModelDefRef()).getTopState(), cache);
// Perform load balance upon checking conditions below
Set<Partition> loadbalanceThrottledPartitions;
@@ -398,7 +398,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput,
bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap,
partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName),
- onlyDownwardLoadBalance, stateModelDef);
+ onlyDownwardLoadBalance, stateModelDef, cache);
if (clusterStatusMonitor != null) {
clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(),
@@ -461,7 +461,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
*/
private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
- Set<Partition> partitionsNeedLoadbalance) {
+ Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache)
{
String resourceName = resource.getResourceName();
// check and charge pending transitions
@@ -481,17 +481,22 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
if (pendingMap.size() > 0) {
- throttleController.chargeCluster(rebalanceType);
- throttleController.chargeResource(rebalanceType, resourceName);
-
- // charge each instance.
+ boolean shouldChargePartition = false;
for (String instance : pendingMap.keySet()) {
String currentState = currentStateMap.get(instance);
String pendingState = pendingMap.get(instance);
- if (pendingState != null && !pendingState.equals(currentState)) {
+ if (pendingState != null && !pendingState.equals(currentState)
+ && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+ .contains(instance)) {
+ // Only charge this instance if the partition is not disabled
throttleController.chargeInstance(rebalanceType, instance);
+ shouldChargePartition = true;
}
}
+ if (shouldChargePartition) {
+ throttleController.chargeCluster(rebalanceType);
+ throttleController.chargeResource(rebalanceType, resourceName);
+ }
}
}
}
@@ -508,13 +513,15 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
* @param partitionsNeedRecovery
* @param currentStateOutput
* @param topState
+ * @param cache
* @return a set of partitions that need recovery but did not get recovered due to throttling
*/
private Set<Partition> recoveryRebalance(Resource resource,
PartitionStateMap bestPossiblePartitionStateMap,
StateTransitionThrottleController throttleController,
PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery,
- CurrentStateOutput currentStateOutput, String topState) {
+ CurrentStateOutput currentStateOutput, String topState,
+ ResourceControllerDataProvider cache) {
String resourceName = resource.getResourceName();
Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
@@ -540,7 +547,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
for (Partition partition : partitionsNeedRecoveryPrioritized) {
throttleStateTransitionsForPartition(throttleController, resourceName, partition,
currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled,
- intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
+ intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, cache);
}
LogUtil.logInfo(logger, _eventId, String.format(
"For resource %s: Num of partitions needing recovery: %d, Num of partitions needing
recovery"
@@ -563,6 +570,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
* @param currentStateMap
* @param onlyDownwardLoadBalance true when only allowing downward transitions
* @param stateModelDef for determining whether a partition's transitions are strictly
downward
+ * @param cache
* @return
*/
private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput,
@@ -570,7 +578,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
StateTransitionThrottleController throttleController,
PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance,
Map<Partition, Map<String, String>> currentStateMap, boolean onlyDownwardLoadBalance,
- StateModelDefinition stateModelDef) {
+ StateModelDefinition stateModelDef, ResourceControllerDataProvider cache) {
String resourceName = resource.getResourceName();
Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
@@ -608,7 +616,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
throttleStateTransitionsForPartition(throttleController, resourceName, partition,
currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
- intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
+ intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
}
LogUtil.logInfo(logger, _eventId, String.format(
"For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing"
@@ -628,12 +636,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
* @param partitionsThrottled
* @param intermediatePartitionStateMap
* @param rebalanceType
+ * @param cache
*/
private void throttleStateTransitionsForPartition(
StateTransitionThrottleController throttleController, String resourceName,
Partition partition, CurrentStateOutput currentStateOutput,
PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
- PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType) {
+ PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
+ ResourceControllerDataProvider cache) {
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -655,7 +665,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
for (String instance : allInstances) {
String currentState = currentStateMap.get(instance);
String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null && !bestPossibleState.equals(currentState))
{
+ if (bestPossibleState != null && !bestPossibleState.equals(currentState)
+ && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+ .contains(instance)) {
if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
hasReachedThrottlingLimit = true;
if (logger.isDebugEnabled()) {
@@ -669,24 +681,46 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
}
if (!hasReachedThrottlingLimit) {
- // This implies that there is room for more pending states. Find
- // instances with a replica whose current state is different from BestPossibleState
and
+ // This implies that there is room for more state transitions.
+ // Find instances with a replica whose current state is different from BestPossibleState
and
// "charge" for it, and bestPossibleStates will become intermediate states
intermediateMap.putAll(bestPossibleMap);
+ boolean shouldChargeForPartition = false;
for (String instance : allInstances) {
String currentState = currentStateMap.get(instance);
String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null && !bestPossibleState.equals(currentState))
{
+ if (bestPossibleState != null && !bestPossibleState.equals(currentState)
+ && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+ .contains(instance)) {
throttleController.chargeInstance(rebalanceType, instance);
+ shouldChargeForPartition = true;
}
}
- throttleController.chargeCluster(rebalanceType);
- throttleController.chargeResource(rebalanceType, resourceName);
+ if (shouldChargeForPartition) {
+ throttleController.chargeCluster(rebalanceType);
+ throttleController.chargeResource(rebalanceType, resourceName);
+ }
} else {
- // No more room for more pending states; current states will just become intermediate
states
+ // No more room for more state transitions; current states will just become intermediate
+ // states unless the partition is disabled
// Add this partition to a set of throttled partitions
- intermediateMap.putAll(currentStateMap);
- partitionsThrottled.add(partition);
+ for (String instance : allInstances) {
+ String currentState = currentStateMap.get(instance);
+ String bestPossibleState = bestPossibleMap.get(instance);
+ if (bestPossibleState != null && !bestPossibleState.equals(currentState)
+ && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+ .contains(instance)) {
+ // Because this partition is disabled, we allow assignment
+ intermediateMap.put(instance, bestPossibleState);
+ } else {
+ // This partition is not disabled, so it must be throttled by just passing on the
current
+ // state
+ if (currentState != null) {
+ intermediateMap.put(instance, currentState);
+ }
+ partitionsThrottled.add(partition);
+ }
+ }
}
intermediatePartitionStateMap.setState(partition, intermediateMap);
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index 73bdcf8..fa0fe6e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -37,6 +37,7 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -169,7 +170,8 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
* and no throttle config set for recovery balance
* and throttle config of 1 set for load balance,
* Instead of disabling the instance, we disable the partition in the instance config.
- * Here, we set the recovery balance config. Then we should still see the MASTER.
+ * Here, we set the recovery balance config to 0. But we should still see the downward
transition
+ * regardless.
* * instance 1 : S (M->S->Offline)
* * instance 2 : M (S->M because it's in recovery)
* * instance 3 : S
@@ -205,7 +207,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
.entrySet()) {
if (partitionState.getKey().equals("TestDB0_2")) {
- Assert.assertTrue(partitionState.getValue().equals("MASTER"));
+ Assert.assertFalse(partitionState.getValue().equals("MASTER"));
}
}
}
@@ -219,6 +221,85 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
deleteCluster(_clusterName);
}
+ @Test
+ public void testNoThrottleOnDisabledInstance() throws Exception {
+ int participantCount = 5;
+ setupEnvironment(participantCount);
+ setThrottleConfig();
+
+ // Disable an instance so that it will not be subject to throttling
+ PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
+ InstanceConfig instanceConfig = _accessor.getProperty(key);
+ instanceConfig.setInstanceEnabled(false);
+ _accessor.setProperty(key, instanceConfig);
+
+ // Set the state transition delay so that transitions would be processed slowly
+ DelayedTransitionBase.setDelay(1000000L);
+
+ // Resume the controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+ controller.syncStart();
+ Thread.sleep(500L);
+
+ // Check that there are more messages on this Participant despite the throttle config
set at 1
+ Assert.assertTrue(verifyMultipleMessages(_participants[0]));
+
+ // clean up the cluster
+ controller.syncStop();
+ for (int i = 0; i < participantCount; i++) {
+ _participants[i].syncStop();
+ }
+ deleteCluster(_clusterName);
+ }
+
+ @Test
+ public void testNoThrottleOnDisabledPartition() throws Exception {
+ int participantCount = 3;
+ setupEnvironment(participantCount);
+ setThrottleConfig();
+
+ // Disable a partition so that it will not be subject to throttling
+ String partitionName = _resourceName + "0_0";
+ for (int i = 0; i < participantCount; i++) {
+ disablePartitionOnInstance(_participants[i], _resourceName + "0", partitionName);
+ }
+
+ String newResource = "abc";
+ IdealState idealState = new FullAutoModeISBuilder(newResource).setStateModel("MasterSlave")
+ .setStateModelFactoryName("DEFAULT").setNumPartitions(5).setNumReplica(3)
+ .setMinActiveReplica(2).setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO)
+ .setRebalancerClass("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer")
+ .setRebalanceStrategy(
+ "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy")
+ .build();
+
+ _gSetupTool.addResourceToCluster(_clusterName, newResource, idealState);
+ _gSetupTool.rebalanceStorageCluster(_clusterName, newResource, 3);
+
+ // Set the state transition delay so that transitions would be processed slowly
+ DelayedTransitionBase.setDelay(1000000L);
+
+ // Now Helix will try to bring this up on all instances. But the disabled partition will
go to
+ // offline. This should allow each instance to have 2 messages despite having the throttle
set
+ // at 1
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+ controller.syncStart();
+ Thread.sleep(500L);
+
+ for (MockParticipantManager participantManager : _participants) {
+ Assert.assertTrue(verifyTwoMessages(participantManager));
+ }
+
+ // clean up the cluster
+ controller.syncStop();
+ for (int i = 0; i < participantCount; i++) {
+ _participants[i].syncStop();
+ }
+ deleteCluster(_clusterName);
+ }
+
/**
* Set up the cluster and pause the controller.
* @param participantCount
@@ -253,6 +334,37 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
}
/**
+ * Set all throttle configs at 1 so that we could test by observing the number of ongoing
+ * transitions.
+ */
+ private void setThrottleConfig() {
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+
+ ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+ clusterConfig.setResourcePriorityField("Name");
+ List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
+
+ // Add throttling at cluster-level
+ throttleConfigs.add(new StateTransitionThrottleConfig(
+ StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+ throttleConfigs.add(
+ new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+ throttleConfigs
+ .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+
+ // Add throttling at instance level
+ throttleConfigs
+ .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 1));
+
+ clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+ _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+ }
+
+ /**
* Set throttle limits only for load balance so that none of them would happen.
*/
private void setThrottleConfigForLoadBalance() {
@@ -347,4 +459,32 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase {
instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, false);
_accessor.setProperty(key, instanceConfig);
}
+
+ /**
+ * Ensure that there are more than 1 message for a given Participant.
+ * @param participant
+ * @return
+ */
+ private boolean verifyMultipleMessages(final MockParticipantManager participant) {
+ PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName());
+ List<String> messageNames = _accessor.getChildNames(key);
+ if (messageNames != null) {
+ return messageNames.size() > 1;
+ }
+ return false;
+ }
+
+ /**
+ * Ensure that there are 2 messages for a given Participant.
+ * @param participant
+ * @return
+ */
+ private boolean verifyTwoMessages(final MockParticipantManager participant) {
+ PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName());
+ List<String> messageNames = _accessor.getChildNames(key);
+ if (messageNames != null) {
+ return messageNames.size() == 2;
+ }
+ return false;
+ }
}
|