[HELIX-704] Refactor IntermediateState and throttling code
Refactor IntermediateState and throttling code with comments.
Changelist:
1. Add clear comments/JavaDoc/log messages
2. Rename confusing variable names and typos
3. A few small micro-optimizations
There is no change in the code logic.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/323fbd04
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/323fbd04
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/323fbd04
Branch: refs/heads/master
Commit: 323fbd049ed84a17ca8a2a00019bf76d51f5346e
Parents: 0d77cba
Author: Hunter Lee <narendly@gmail.com>
Authored: Mon Jun 25 14:55:07 2018 -0700
Committer: Hunter Lee <narendly@gmail.com>
Committed: Mon Jun 25 14:55:42 2018 -0700
----------------------------------------------------------------------
.../config/StateTransitionThrottleConfig.java | 27 +-
.../stages/IntermediateStateCalcStage.java | 606 +++++++++++--------
.../StateTransitionThrottleController.java | 187 +++---
3 files changed, 481 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/323fbd04/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index c927a68..4955b86 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -48,7 +48,7 @@ public class StateTransitionThrottleConfig {
public enum RebalanceType {
LOAD_BALANCE,
RECOVERY_BALANCE,
- ANY,
+ ANY, // A type used for general throttling (to account for all types of rebalance)
NONE
}
@@ -56,8 +56,8 @@ public class StateTransitionThrottleConfig {
ThrottleScope _throttleScope;
Long _maxPartitionInTransition;
- public StateTransitionThrottleConfig(RebalanceType rebalanceType,
- ThrottleScope throttleScope, long maxPartitionInTransition) {
+ public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope,
+ long maxPartitionInTransition) {
_rebalanceType = rebalanceType;
_throttleScope = throttleScope;
_maxPartitionInTransition = maxPartitionInTransition;
@@ -79,11 +79,10 @@ public class StateTransitionThrottleConfig {
/**
* Generate the JSON String for StateTransitionThrottleConfig.
- *
* @return Json String for this config.
*/
public String toJSON() {
- Map<String, String> configMap = new HashMap<String, String>();
+ Map<String, String> configMap = new HashMap<>();
configMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
configMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
configMap.put(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name(),
@@ -94,7 +93,7 @@ public class StateTransitionThrottleConfig {
ObjectWriter objectWriter = OBJECT_MAPPER.writer();
jsonStr = objectWriter.writeValueAsString(configMap);
} catch (IOException e) {
- logger.error("Failed to convert config map to JSON object! " + configMap);
+ logger.error("Failed to convert config map to JSON object! {}", configMap);
}
return jsonStr;
@@ -102,9 +101,9 @@ public class StateTransitionThrottleConfig {
/**
* Instantiate a throttle config from a config JSON string.
- *
* @param configJsonStr
- * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+ * @return StateTransitionThrottleConfig or null if the given configs map is not a valid
+ * StateTransitionThrottleConfig.
*/
public static StateTransitionThrottleConfig fromJSON(String configJsonStr) {
StateTransitionThrottleConfig throttleConfig = null;
@@ -113,23 +112,21 @@ public class StateTransitionThrottleConfig {
Map<String, String> configsMap = objectReader.readValue(configJsonStr);
throttleConfig = fromConfigMap(configsMap);
} catch (IOException e) {
- logger.error("Failed to convert JSON string to config map! " + configJsonStr);
+ logger.error("Failed to convert JSON string to config map! {}", configJsonStr);
}
return throttleConfig;
}
/**
- * Instantiate a throttle config from a config map
- *
+ * Instantiate a throttle config from a config map.
* @param configsMap
- *
* @return StateTransitionThrottleConfig or null if the given configs map is not a valid
- * StateTransitionThrottleConfig.
+ * StateTransitionThrottleConfig.
*/
public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) {
- if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) || !configsMap
- .containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
+ if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name())
+ || !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
// not a valid StateTransitionThrottleConfig
return null;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/323fbd04/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 166cc63..ba5a29a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -35,17 +35,16 @@ import org.slf4j.LoggerFactory;
import java.util.*;
/**
- * For partition compute the Intermediate State (instance,state) pair based on the BestPossible
- * State and Current State, with all constraints applied (such as state transition throttling).
+ * For partition compute the Intermediate State (instance,state) pair based on the BestPossibleState
+ * and CurrentState, with all constraints applied (such as state transition throttling).
*/
public class IntermediateStateCalcStage extends AbstractBaseStage {
- private static final Logger logger = LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
+ private static final Logger logger =
+ LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.name());
-
+ CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
@@ -53,15 +52,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
|| cache == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires CURRENT_STATE|BEST_POSSIBLE_STATE|RESOURCES|DataCache");
+ throw new StageException(String.format("Missing attributes in event: %s. "
+ + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
+ event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
}
IntermediateStateOutput intermediateStateOutput =
compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput);
event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput);
- // Check whether any instance in the cluster could be assigned more partitions than allowed, if yes, pause the rebalancer.
+ // Make sure no instance has more replicas/partitions assigned than maxPartitionPerInstance. If
+ // it does, pause the rebalance and put the cluster on maintenance mode
int maxPartitionPerInstance = cache.getClusterConfig().getMaxPartitionsPerInstance();
if (maxPartitionPerInstance > 0) {
validateMaxPartitionsPerInstance(event, cache, intermediateStateOutput,
@@ -69,76 +70,93 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
}
+ /**
+ * Go through each resource, and based on BestPossibleState and CurrentState, compute
+ * IntermediateState as close to BestPossibleState while maintaining throttling constraints (for
+ * example, ensure that the number of possible pending state transitions does NOT go over the set
+ * threshold).
+ * @param event
+ * @param resourceMap
+ * @param currentStateOutput
+ * @param bestPossibleStateOutput
+ * @return
+ */
private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
- // for each resource
- // get the best possible state and current state
- // try to bring immediate state close to best possible state until
- // the possible pending state transition numbers reach the set throttle number.
IntermediateStateOutput output = new IntermediateStateOutput();
ClusterDataCache dataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
- StateTransitionThrottleController throttleController =
- new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(),
- dataCache.getLiveInstances().keySet());
+ StateTransitionThrottleController throttleController = new StateTransitionThrottleController(
+ resourceMap.keySet(), dataCache.getClusterConfig(), dataCache.getLiveInstances().keySet());
- // Resource level prioritization with numerical sortable field.
- // If no value has been set, it will be treated as lowest priority.
- List<ResourcePriority> prioritizedResourceList = new ArrayList<ResourcePriority>();
+ // Resource level prioritization based on the numerical (sortable) priority field.
+ // If the resource priority field is null/not set, the resource will be treated as lowest
+ // priority.
+ List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
for (String resourceName : resourceMap.keySet()) {
prioritizedResourceList.add(new ResourcePriority(resourceName, Integer.MIN_VALUE));
}
- // Not have resource level prioritization if user did not set the field name
+ // If resourcePriorityField is null at the cluster level, all resources will be considered equal
+ // in priority by keeping all priorities at MIN_VALUE
if (dataCache.getClusterConfig().getResourcePriorityField() != null) {
String priorityField = dataCache.getClusterConfig().getResourcePriorityField();
-
for (ResourcePriority resourcePriority : prioritizedResourceList) {
String resourceName = resourcePriority.getResourceName();
// Will take the priority from ResourceConfig first
// If ResourceConfig does not exist or does not have this field.
- // Try to fetch it from ideal state. Otherwise will treated as lowest priority
+ // Try to load it from the resource's IdealState. Otherwise, keep it at the lowest priority
if (dataCache.getResourceConfig(resourceName) != null
&& dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField) != null) {
resourcePriority.setPriority(
dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField));
- } else if (dataCache.getIdealState(resourceName) != null
- && dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField)
- != null) {
-
+ } else if (dataCache.getIdealState(resourceName) != null && dataCache
+ .getIdealState(resourceName).getRecord().getSimpleField(priorityField) != null) {
resourcePriority.setPriority(
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
}
}
-
- Collections.sort(prioritizedResourceList, new ResourcePriortiyComparator());
+ Collections.sort(prioritizedResourceList, new ResourcePriorityComparator());
}
- // Update cluster status monitor mbean
- ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ // Re-load ClusterStatusMonitor MBean
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ // Priority is applied in assignment computation because higher priority by looping in order of
+ // decreasing priority
for (ResourcePriority resourcePriority : prioritizedResourceList) {
String resourceName = resourcePriority.getResourceName();
Resource resource = resourceMap.get(resourceName);
IdealState idealState = dataCache.getIdealState(resourceName);
-
if (idealState == null) {
- // if ideal state is deleted, use an empty one
- logger.info("resource:" + resourceName + " does not exist anymore");
+ // If IdealState is null, use an empty one
+ logger.info("IdealState for resource {} does not exist; resource may not exist anymore",
+ resourceName);
idealState = new IdealState(resourceName);
idealState.setStateModelDefRef(resource.getStateModelDefRef());
}
-
- PartitionStateMap intermediatePartitionStateMap =
- computeIntermediatePartitionState(dataCache, clusterStatusMonitor, idealState,
- resourceMap.get(resourceName), currentStateOutput,
- bestPossibleStateOutput.getPartitionStateMap(resourceName),
- bestPossibleStateOutput.getPreferenceLists(resourceName), throttleController);
+ PartitionStateMap intermediatePartitionStateMap = computeIntermediatePartitionState(dataCache,
+ clusterStatusMonitor, idealState, resourceMap.get(resourceName), currentStateOutput,
+ bestPossibleStateOutput.getPartitionStateMap(resourceName),
+ bestPossibleStateOutput.getPreferenceLists(resourceName), throttleController);
output.setState(resourceName, intermediatePartitionStateMap);
}
return output;
}
+ /**
+ * Go through every instance in the assignment and check that each instance does NOT have more
+ * replicas for partitions assigned to it than maxPartitionsPerInstance. If the assignment
+ * violates this, put the cluster on maintenance mode.
+ * This logic could be integrated with compute() for IntermediateState calculation but appended
+ * separately for visibility and testing. Additionally, performing validation after compute()
+ * ensures that we have a full intermediate state mapping complete prior to validation.
+ * @param event
+ * @param cache
+ * @param intermediateStateOutput
+ * @param maxPartitionPerInstance
+ */
private void validateMaxPartitionsPerInstance(ClusterEvent event, ClusterDataCache cache,
IntermediateStateOutput intermediateStateOutput, int maxPartitionPerInstance) {
Map<String, PartitionStateMap> resourceStatesMap =
@@ -147,9 +165,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
for (String resource : resourceStatesMap.keySet()) {
IdealState idealState = cache.getIdealState(resource);
- if (idealState != null && idealState.getStateModelDefRef()
- .equals(BuiltInStateModelDefinitions.Task.name())) {
- // ignore task here. Task has its own throttling logic
+ if (idealState != null
+ && idealState.getStateModelDefRef().equals(BuiltInStateModelDefinitions.Task.name())) {
+ // Ignore task here. Task has its own throttling logic
continue;
}
@@ -158,7 +176,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
for (Partition p : stateMaps.keySet()) {
Map<String, String> stateMap = stateMaps.get(p);
for (String instance : stateMap.keySet()) {
- //ignore replica to be dropped.
+ // If this replica is in DROPPED state, do not count it in the partition count since it is
+ // to be dropped
String state = stateMap.get(instance);
if (state.equals(HelixDefinedState.DROPPED.name())) {
continue;
@@ -167,18 +186,25 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
if (!instancePartitionCounts.containsKey(instance)) {
instancePartitionCounts.put(instance, 0);
}
- int partitionCount = instancePartitionCounts.get(instance);
+ int partitionCount = instancePartitionCounts.get(instance); // Number of replicas (from
+ // different partitions) held
+ // in this instance
partitionCount++;
if (partitionCount > maxPartitionPerInstance) {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
String errMsg = String.format(
- "Partition count to be assigned to instance %s is greater than %d. Stop rebalance and pause the cluster %s",
+ "Problem: according to this assignment, instance %s contains more "
+ + "replicas/partitions than the maximum number allowed (%d). Pipeline will "
+ + "stop the rebalance and pause the cluster %s",
instance, maxPartitionPerInstance, cache.getClusterName());
if (manager != null) {
- manager.getClusterManagmentTool()
- .enableMaintenanceMode(manager.getClusterName(), true, errMsg);
+ manager.getClusterManagmentTool().enableMaintenanceMode(manager.getClusterName(),
+ true, errMsg);
} else {
- logger.error("Failed to pause cluster, HelixManager is not set!");
+ logger.error(
+ "HelixManager is not set/null! Failed to pause this cluster/enable maintenance"
+ + " mode due to an instance being assigned more replicas/partitions than "
+ + "the limit.");
}
throw new HelixException(errMsg);
}
@@ -188,28 +214,40 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
}
+ /**
+ * Compute intermediate partition states for a prioritized resource.
+ * @param cache
+ * @param clusterStatusMonitor
+ * @param idealState
+ * @param resource
+ * @param currentStateOutput
+ * @param bestPossiblePartitionStateMap
+ * @param preferenceLists
+ * @param throttleController
+ * @return
+ */
private PartitionStateMap computeIntermediatePartitionState(ClusterDataCache cache,
ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, Resource resource,
CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap,
Map<String, List<String>> preferenceLists,
StateTransitionThrottleController throttleController) {
String resourceName = resource.getResourceName();
- logger.debug("Processing resource:" + resourceName);
+ logger.debug("Processing resource: {}", resourceName);
- if (!throttleController.isThrottleEnabled() || !IdealState.RebalanceMode.FULL_AUTO
- .equals(idealState.getRebalanceMode()) || cache.isTaskCache()) {
- // We only apply throttling on FULL-AUTO now.
+ // Throttling is applied only on FULL-AUTO mode
+ if (!throttleController.isThrottleEnabled()
+ || !IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())
+ || cache.isTaskCache()) {
return bestPossiblePartitionStateMap;
}
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
-
PartitionStateMap intermediatePartitionStateMap = new PartitionStateMap(resourceName);
Set<Partition> partitionsNeedRecovery = new HashSet<>();
- Set<Partition> partitionsNeedLoadbalance = new HashSet<>();
- Set<Partition> partitionshaveErrorStateReplica = new HashSet<>();
+ Set<Partition> partitionsNeedLoadBalance = new HashSet<>();
+ Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
for (Partition partition : resource.getPartitions()) {
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -217,93 +255,98 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
bestPossiblePartitionStateMap.getPartitionMap(partition);
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
- RebalanceType rebalanceType =
- getRebalanceType(cache, bestPossibleMap, preferenceList, stateModelDef, currentStateMap,
- idealState);
+ RebalanceType rebalanceType = getRebalanceType(cache, bestPossibleMap, preferenceList,
+ stateModelDef, currentStateMap, idealState);
- // TODO refine getRebalanceType to return more accurate rebalance types.
- // So following logic doesn't need to check for more details.
- boolean rebalanceNeeded = false;
+ // TODO: refine getRebalanceType to return more accurate rebalance types. So the following
+ // logic doesn't need to check for more details.
+ boolean isRebalanceNeeded = false;
+
+ // Number of states required by StateModelDefinition are not satisfied, need recovery
if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
- // Check if any error exist
+ // Check whether partition is in ERROR state
if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) {
- partitionshaveErrorStateReplica.add(partition);
+ partitionsWithErrorStateReplica.add(partition);
}
// Check if recovery is needed for this partition
if (!currentStateMap.equals(bestPossibleMap)) {
partitionsNeedRecovery.add(partition);
- rebalanceNeeded = true;
- } // else, if currentState == bestPossibleState, no rebalance needed
+ isRebalanceNeeded = true;
+ }
+ // Number of states required by StateModelDefinition are satisfied, but to achieve
+ // BestPossibleState, need load balance
} else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
- partitionsNeedLoadbalance.add(partition);
- rebalanceNeeded = true;
+ partitionsNeedLoadBalance.add(partition);
+ isRebalanceNeeded = true;
}
-
- if (!rebalanceNeeded) {
- // no rebalance needed.
+ // Currently at BestPossibleState, no further action necessary
+ if (!isRebalanceNeeded) {
Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap);
intermediatePartitionStateMap.setState(partition, intermediateMap);
}
}
if (!partitionsNeedRecovery.isEmpty()) {
- logger.info(
- "recovery balance needed for " + resourceName + " partitions: " + partitionsNeedRecovery);
+ logger.info("Recovery balance needed for {} partitions: {}", resourceName,
+ partitionsNeedRecovery);
}
- if (!partitionsNeedLoadbalance.isEmpty()) {
- logger.info(
- "load balance needed for " + resourceName + " partitions: " + partitionsNeedLoadbalance);
+ if (!partitionsNeedLoadBalance.isEmpty()) {
+ logger.info("Load balance needed for partitions: {}", resourceName,
+ partitionsNeedLoadBalance);
}
- if (!partitionshaveErrorStateReplica.isEmpty()) {
- logger.info("partition currently has ERROR replica in " + resourceName + " partitions: "
- + partitionshaveErrorStateReplica);
+ if (!partitionsWithErrorStateReplica.isEmpty()) {
+ logger.info("Partition currently has an ERROR replica in {} partitions: {}", resourceName,
+ partitionsWithErrorStateReplica);
}
chargePendingTransition(resource, currentStateOutput, throttleController,
- partitionsNeedRecovery, partitionsNeedLoadbalance);
+ partitionsNeedRecovery, partitionsNeedLoadBalance);
- // perform recovery rebalance
+ // Perform recovery balance
Set<Partition> recoveryThrottledPartitions =
- recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController,
- intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput,
- cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
-
- Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadbalance;
-
- long maxAllowedErrorPartitions = cache.getClusterConfig().getErrorPartitionThresholdForLoadBalance();
- if (partitionsNeedRecovery.isEmpty() &&
- (maxAllowedErrorPartitions < 0
- || partitionshaveErrorStateReplica.size() <= maxAllowedErrorPartitions)) {
- // perform load balance only if
- // 1. no recovery operation to be scheduled.
- // 2. error partition count is less than configured limitation.
- loadbalanceThrottledPartitions =
- loadRebalance(resource, currentStateOutput, bestPossiblePartitionStateMap,
- throttleController, intermediatePartitionStateMap, partitionsNeedLoadbalance,
- currentStateOutput.getCurrentStateMap(resourceName));
+ recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController,
+ intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput,
+ cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
+
+ // Perform load balance
+ Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadBalance;
+
+ long maxAllowedErrorPartitions =
+ cache.getClusterConfig().getErrorPartitionThresholdForLoadBalance();
+
+ // TODO: Logic here needs change - when there is an error partition, load should still happen
+
+ // Perform load balance only if
+ // 1. no recovery operation to be scheduled
+ // 2. error partition count is less than configured limitation
+ if (partitionsNeedRecovery.isEmpty() && (maxAllowedErrorPartitions < 0
+ || partitionsWithErrorStateReplica.size() <= maxAllowedErrorPartitions)) {
+ loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput,
+ bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap,
+ partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName));
} else {
- // skip load balance, use current state mapping
- for (Partition p : partitionsNeedLoadbalance) {
+ // Skip load balance and current states just become intermediate states
+ for (Partition partition : partitionsNeedLoadBalance) {
Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, p);
- intermediatePartitionStateMap.setState(p, currentStateMap);
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+ intermediatePartitionStateMap.setState(partition, currentStateMap);
}
}
if (clusterStatusMonitor != null) {
clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(),
- partitionsNeedLoadbalance.size(), recoveryThrottledPartitions.size(),
+ partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(),
loadbalanceThrottledPartitions.size());
}
if (logger.isDebugEnabled()) {
- logParitionMapState(resourceName, new HashSet<>(resource.getPartitions()),
- partitionsNeedRecovery, recoveryThrottledPartitions, partitionsNeedLoadbalance,
+ logPartitionMapState(resourceName, new HashSet<>(resource.getPartitions()),
+ partitionsNeedRecovery, recoveryThrottledPartitions, partitionsNeedLoadBalance,
loadbalanceThrottledPartitions, currentStateOutput, bestPossiblePartitionStateMap,
intermediatePartitionStateMap);
}
- logger.debug("End processing resource:" + resourceName);
+ logger.debug("End processing resource: {}", resourceName);
return intermediatePartitionStateMap;
}
@@ -317,8 +360,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
// check and charge pending transitions
for (Partition partition : resource.getPartitions()) {
+ // Maps instance to its current state
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
+ // Maps instance to its pending (next) state
Map<String, String> pendingMap =
currentStateOutput.getPendingStateMap(resourceName, partition);
@@ -334,11 +379,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
throttleController.chargeResource(rebalanceType, resourceName);
// charge each instance.
- for (String ins : pendingMap.keySet()) {
- String currentState = currentStateMap.get(ins);
- String pendingState = pendingMap.get(ins);
+ for (String instance : pendingMap.keySet()) {
+ String currentState = currentStateMap.get(instance);
+ String pendingState = pendingMap.get(instance);
if (pendingState != null && !pendingState.equals(currentState)) {
- throttleController.chargeInstance(rebalanceType, ins);
+ throttleController.chargeInstance(rebalanceType, instance);
}
}
}
@@ -346,137 +391,173 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
/**
- * Perform any recovery balance if needed, fill intermediatePartitionStateMap
- * if recover rebalance is needed.
- * return the partitions needs recoveryRebalance but get throttled
+ * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
+ * each partition, throttle state transitions if needed. Also populate
+ * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
+ * CurrentState (if throttled).
+ * @param resource
+ * @param bestPossiblePartitionStateMap
+ * @param throttleController
+ * @param intermediatePartitionStateMap
+ * @param partitionsNeedRecovery
+ * @param currentStateOutput
+ * @param topState
+ * @return a set of partitions that need recovery but did not get recovered due to throttling
*/
- private Set<Partition> recoveryRebalance(Resource resource, PartitionStateMap bestPossiblePartitionStateMap,
+ private Set<Partition> recoveryRebalance(Resource resource,
+ PartitionStateMap bestPossiblePartitionStateMap,
StateTransitionThrottleController throttleController,
PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery,
CurrentStateOutput currentStateOutput, String topState) {
- Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<Partition>();
+ String resourceName = resource.getResourceName();
+ Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
+ // Maps Partition -> Instance -> State
Map<Partition, Map<String, String>> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName());
- List<Partition> partitionsNeedRecoveryPrioritized =
- new ArrayList<Partition>(partitionsNeedRecovery);
+ currentStateOutput.getCurrentStateMap(resourceName);
+ List<Partition> partitionsNeedRecoveryPrioritized = new ArrayList<>(partitionsNeedRecovery);
- // TODO : Due to currently using JAVA 1.6, the original order of partitions list is not
- // determinable, sort the list by partition name and remove the code after bump to JAVA 1.8
+ // TODO: Remove this sort by partition name when Java 1.8 is used
+ // We want the result of the intermediate state calculation to be deterministic. We sort here by
+ // partition name to ensure that the order is consistent for inputs fed into
+ // PartitionPriorityComparator sort
Collections.sort(partitionsNeedRecoveryPrioritized, new Comparator<Partition>() {
@Override
- public int compare(Partition o1, Partition o2) {
- return o1.getPartitionName().compareTo(o2.getPartitionName());
+ public int compare(Partition partition1, Partition partition2) {
+ return partition1.getPartitionName().compareTo(partition2.getPartitionName());
}
});
+ Collections.sort(partitionsNeedRecoveryPrioritized, new PartitionPriorityComparator(
+ bestPossiblePartitionStateMap.getStateMap(), currentStateMap, topState, true));
- Collections.sort(partitionsNeedRecoveryPrioritized,
- new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
- currentStateMap, topState, true));
-
+ // For each partition, apply throttling if needed.
for (Partition partition : partitionsNeedRecoveryPrioritized) {
- throtteStateTransitions(throttleController, resource.getResourceName(), partition,
+ throttleStateTransitionsForPartition(throttleController, resourceName, partition,
currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled,
intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
}
-
- logger.info(String
- .format("%s: needRecovery: %d, recoverybalanceThrottled: %d", resource.getResourceName(),
- partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size()));
+ logger.info(String.format(
+ "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery but throttled (not recovered): %d",
+ resourceName, partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size()));
return partitionRecoveryBalanceThrottled;
}
- /* return the partitions needs loadRebalance but get throttled */
+ /**
+ * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
+ * each partition, throttle state transitions if needed. Also populate
+ * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
+ * CurrentState (if throttled).
+ * @param resource
+ * @param currentStateOutput
+ * @param bestPossiblePartitionStateMap
+ * @param throttleController
+ * @param intermediatePartitionStateMap
+ * @param partitionsNeedLoadbalance
+ * @param currentStateMaps
+ * @return a set of partitions that need to be load-balanced but did not due to throttling
+ */
private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput,
PartitionStateMap bestPossiblePartitionStateMap,
StateTransitionThrottleController throttleController,
PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance,
Map<Partition, Map<String, String>> currentStateMaps) {
String resourceName = resource.getResourceName();
- Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>();
+ Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
List<Partition> partitionsNeedLoadRebalancePrioritized =
new ArrayList<>(partitionsNeedLoadbalance);
- // TODO : Due to currently using JAVA 1.6, the original order of partitions list is not
- // determinable, sort the list by partition name and remove the code after bump to JAVA 1.8
+ // TODO: Remove this sort by partition name when Java 1.8 is used
+ // We want the result of the intermediate state calculation to be deterministic. We sort here by
+ // partition name to ensure that the order is consistent for inputs fed into
+ // PartitionPriorityComparator sort
Collections.sort(partitionsNeedLoadRebalancePrioritized, new Comparator<Partition>() {
@Override
- public int compare(Partition o1, Partition o2) {
- return o1.getPartitionName().compareTo(o2.getPartitionName());
+ public int compare(Partition partition1, Partition partition2) {
+ return partition1.getPartitionName().compareTo(partition2.getPartitionName());
}
});
-
- Collections.sort(partitionsNeedLoadRebalancePrioritized,
- new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
- currentStateMaps, "", false));
-
+ Collections.sort(partitionsNeedLoadRebalancePrioritized, new PartitionPriorityComparator(
+ bestPossiblePartitionStateMap.getStateMap(), currentStateMaps, "", false));
for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
- throtteStateTransitions(throttleController, resourceName, partition, currentStateOutput,
- bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
+ throttleStateTransitionsForPartition(throttleController, resourceName, partition,
+ currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
}
-
- logger.info(String
- .format("%s: loadbalanceNeeded: %d, loadbalanceThrottled: %d", resource.getResourceName(), partitionsNeedLoadbalance.size(),
- partitionsLoadbalanceThrottled.size()));
-
- if (logger.isDebugEnabled()) {
- logger.debug("recovery balance throttled for " + resourceName + " partitions: "
- + partitionsLoadbalanceThrottled);
- }
-
+ logger.info(String.format(
+ "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing"
+ + " load-balance but throttled (not load-balanced): %d",
+ resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size()));
return partitionsLoadbalanceThrottled;
}
- private void throtteStateTransitions(StateTransitionThrottleController throttleController,
- String resourceName, Partition partition, CurrentStateOutput currentStateOutput,
+ /**
+ * Check the status on throttling at every level (cluster, resource, instance) and set
+ * intermediatePartitionStateMap accordingly per partition.
+ * @param throttleController
+ * @param resourceName
+ * @param partition
+ * @param currentStateOutput
+ * @param bestPossiblePartitionStateMap
+ * @param partitionsThrottled
+ * @param intermediatePartitionStateMap
+ * @param rebalanceType
+ */
+ private void throttleStateTransitionsForPartition(
+ StateTransitionThrottleController throttleController, String resourceName,
+ Partition partition, CurrentStateOutput currentStateOutput,
PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType) {
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
- Set<String> allInstances = new HashSet<String>(currentStateMap.keySet());
+ Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
allInstances.addAll(bestPossibleMap.keySet());
- Map<String, String> intermediateMap = new HashMap<String, String>();
+ Map<String, String> intermediateMap = new HashMap<>();
- boolean throttled = false;
- if (throttleController.throttleforResource(rebalanceType, resourceName)) {
- throttled = true;
+ boolean hasReachedThrottlingLimit = false;
+ if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
+ hasReachedThrottlingLimit = true;
if (logger.isDebugEnabled()) {
- logger
- .debug("Throttled on resource for " + resourceName + " " + partition.getPartitionName());
+ logger.debug("Throttled on partition: {} in resource: {}", partition.getPartitionName(),
+ resourceName);
}
} else {
- // throttle if any of the instance can not handle the state transition
- for (String ins : allInstances) {
- String currentState = currentStateMap.get(ins);
- String bestPossibleState = bestPossibleMap.get(ins);
+ // throttle if any of the instances are not able to accept state transitions
+ for (String instance : allInstances) {
+ String currentState = currentStateMap.get(instance);
+ String bestPossibleState = bestPossibleMap.get(instance);
if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
- if (throttleController.throttleForInstance(rebalanceType, ins)) {
- throttled = true;
+ if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
+ hasReachedThrottlingLimit = true;
if (logger.isDebugEnabled()) {
- logger.debug("Throttled because instance " + ins + " for " + resourceName + " " + partition
- .getPartitionName());
+ logger.debug(
+ "Throttled because of instance: {} for partition: {} in resource: {}" + instance,
+ partition.getPartitionName(), resourceName);
}
+ break;
}
}
}
}
-
- if (!throttled) {
+ if (!hasReachedThrottlingLimit) {
+ // This implies that there is room for more pending states. Find
+ // instances with a replica whose current state is different from BestPossibleState and
+ // "charge" for it, and bestPossibleStates will become intermediate states
intermediateMap.putAll(bestPossibleMap);
- for (String ins : allInstances) {
- String currentState = currentStateMap.get(ins);
- String bestPossibleState = bestPossibleMap.get(ins);
+ for (String instance : allInstances) {
+ String currentState = currentStateMap.get(instance);
+ String bestPossibleState = bestPossibleMap.get(instance);
if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
- throttleController.chargeInstance(rebalanceType, ins);
+ throttleController.chargeInstance(rebalanceType, instance);
}
}
throttleController.chargeCluster(rebalanceType);
throttleController.chargeResource(rebalanceType, resourceName);
} else {
+ // No more room for more pending states; current states will just become intermediate states
+ // Add this partition to a set of throttled partitions
intermediateMap.putAll(currentStateMap);
partitionsThrottled.add(partition);
}
@@ -484,17 +565,19 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
/**
- * Given preferenceList, bestPossibleState and currentState, determine which type of rebalance is
- * needed to fit the idea states defined by the state model definition.
- *
- * @return rebalance type needed to bring the replicas to idea states
- * RECOVERY_BALANCE - required states is not available through all replicas
- * NONE - current state matches the idea state
- * LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the allocation
+ * For a partiton, given its preferenceList, bestPossibleState, and currentState, determine which
+ * type of rebalance is needed to model IdealState's states defined by the state model definition.
+ * @return RebalanceType needed to bring the replicas to idea states
+ * RECOVERY_BALANCE - not all required states (replicas) are available through all
+ * replicas
+ * NONE - current state matches the ideal state
+ * LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the
+ * allocation
*/
private RebalanceType getRebalanceType(ClusterDataCache cache,
Map<String, String> bestPossibleMap, List<String> preferenceList,
- StateModelDefinition stateModelDef, Map<String, String> currentStateMap, IdealState idealState) {
+ StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
+ IdealState idealState) {
if (preferenceList == null) {
preferenceList = Collections.emptyList();
}
@@ -503,80 +586,95 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
Set<String> activeList = new HashSet<>(preferenceList);
activeList.retainAll(cache.getEnabledLiveInstances());
- // Check current states against state model define. If doesn't match, need recovery.
+ // For each state, check that this partition currently has the required number of that state as
+ // required by StateModelDefinition.
LinkedHashMap<String, Integer> expectedStateCountMap =
- stateModelDef.getStateCountMap(activeList.size(), replica);
- Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap);
+ stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+ Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap); // Current
+ // counts
+ // Go through each state and compare counts
for (String state : expectedStateCountMap.keySet()) {
Integer expectedCount = expectedStateCountMap.get(state);
Integer currentCount = currentStateCounts.get(state);
- expectedCount = expectedCount == null? 0 : expectedCount;
- currentCount = currentCount == null? 0 : currentCount;
+ expectedCount = expectedCount == null ? 0 : expectedCount;
+ currentCount = currentCount == null ? 0 : currentCount;
+ // If counts do not match up, this partition requires recovery
if (currentCount < expectedCount) {
- if (!state.equals(HelixDefinedState.DROPPED.name()) &&
- !state.equals(HelixDefinedState.ERROR.name()) &&
- !state.equals(stateModelDef.getInitialState())) {
+ // Recovery is not needed in cases where this partition just started, was dropped, or is in
+ // error
+ if (!state.equals(HelixDefinedState.DROPPED.name())
+ && !state.equals(HelixDefinedState.ERROR.name())
+ && !state.equals(stateModelDef.getInitialState())) {
return RebalanceType.RECOVERY_BALANCE;
}
}
}
- // No recovery needed, all expected replicas exist.
- // Check if the calculated best possible states matches current states
+ // No recovery needed, all expected replicas exist
+ // Check if this partition is actually in the BestPossibleState
if (currentStateMap.equals(bestPossibleMap)) {
- return RebalanceType.NONE;
+ return RebalanceType.NONE; // No further action required
} else {
- // All other cases is categorized as load balance change
- return RebalanceType.LOAD_BALANCE;
+ return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to
+ // achieve BestPossibleState, load balance may be required
+ // to shift replicas around
}
}
- private void logParitionMapState(String resource, Set<Partition> allPartitions,
+ /**
+ * Log rebalancer metadata for debugging purposes.
+ * @param resource
+ * @param allPartitions
+ * @param recoveryPartitions
+ * @param recoveryThrottledPartitions
+ * @param loadbalancePartitions
+ * @param loadbalanceThrottledPartitions
+ * @param currentStateOutput
+ * @param bestPossibleStateMap
+ * @param intermediateStateMap
+ */
+ private void logPartitionMapState(String resource, Set<Partition> allPartitions,
Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions,
Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions,
- CurrentStateOutput currentStateOutput,
- PartitionStateMap bestPossibleStateMap,
+ CurrentStateOutput currentStateOutput, PartitionStateMap bestPossibleStateMap,
PartitionStateMap intermediateStateMap) {
if (logger.isDebugEnabled()) {
- logger.debug("Partitions need recovery: " + recoveryPartitions
- + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions);
- logger.debug("Partitions need loadbalance: " + loadbalancePartitions
- + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions);
+ logger.debug("Partitions need recovery: {}\nPartitions get throttled on recovery: {}",
+ recoveryPartitions, recoveryThrottledPartitions);
+ logger.debug("Partitions need loadbalance: {}\nPartitions get throttled on load-balance: ",
+ loadbalancePartitions, loadbalanceThrottledPartitions);
}
for (Partition partition : allPartitions) {
if (logger.isDebugEnabled()) {
- logger.debug(
- partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition));
- logger.debug(partition + ": Current State: " + currentStateOutput
- .getCurrentStateMap(resource, partition));
- logger.debug(partition + ": Pending state: " + currentStateOutput
- .getPendingMessageMap(resource, partition));
- logger.debug(
- partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition));
+ logger.debug(partition + ": Best possible map: {}",
+ bestPossibleStateMap.getPartitionMap(partition));
+ logger.debug(partition + ": Current State: {}",
+ currentStateOutput.getCurrentStateMap(resource, partition));
+ logger.debug(partition + ": Pending state: {}",
+ currentStateOutput.getPendingMessageMap(resource, partition));
+ logger.debug(partition + ": Intermediate state: {}",
+ intermediateStateMap.getPartitionMap(partition));
}
}
}
- private static class ResourcePriortiyComparator implements Comparator<ResourcePriority> {
- @Override public int compare(ResourcePriority r1, ResourcePriority r2) {
- return r2.compareTo(r1);
- }
- }
-
+ /**
+ * POJO that maps resource name to its priority represented by an integer.
+ */
private static class ResourcePriority {
private String _resourceName;
- private Integer _priority;
+ private int _priority;
- public ResourcePriority(String resourceName, Integer priority) {
+ ResourcePriority(String resourceName, Integer priority) {
_resourceName = resourceName;
_priority = priority;
}
public int compareTo(ResourcePriority resourcePriority) {
- return this._priority.compareTo(resourcePriority._priority);
+ return Integer.compare(_priority, resourcePriority._priority);
}
public String getResourceName() {
@@ -593,16 +691,23 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
}
- // Compare partitions according following standard:
- // 1) Partition without top state always is the highest priority.
- // 2) For partition with top-state, the more number of active replica it has, the less priority.
+ private static class ResourcePriorityComparator implements Comparator<ResourcePriority> {
+ @Override
+ public int compare(ResourcePriority priority1, ResourcePriority priority2) {
+ return priority2.compareTo(priority1);
+ }
+ }
+
+ // Compare partitions according following standard:
+ // 1) Partition without top state always is the highest priority.
+ // 2) For partition with top-state, the more number of active replica it has, the less priority.
private class PartitionPriorityComparator implements Comparator<Partition> {
private Map<Partition, Map<String, String>> _bestPossibleMap;
private Map<Partition, Map<String, String>> _currentStateMap;
private String _topState;
private boolean _recoveryRebalance;
- public PartitionPriorityComparator(Map<Partition, Map<String, String>> bestPossibleMap,
+ PartitionPriorityComparator(Map<Partition, Map<String, String>> bestPossibleMap,
Map<Partition, Map<String, String>> currentStateMap, String topState,
boolean recoveryRebalance) {
_bestPossibleMap = bestPossibleMap;
@@ -614,50 +719,45 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
@Override
public int compare(Partition p1, Partition p2) {
if (_recoveryRebalance) {
- Integer missTopState1 = getMissTopStateIndex(p1);
- Integer missTopState2 = getMissTopStateIndex(p2);
-
+ int missTopState1 = getMissTopStateIndex(p1);
+ int missTopState2 = getMissTopStateIndex(p2);
// Highest priority for the partition without top state
- if (!missTopState1.equals(missTopState2)) {
- return missTopState1.compareTo(missTopState2);
+ if (missTopState1 != missTopState2) {
+ return Integer.compare(missTopState1, missTopState2);
}
-
- Integer currentActiveReplicas1 = getCurrentActiveReplicas(p1);
- Integer currentActiveReplicas2 = getCurrentActiveReplicas(p2);
- return currentActiveReplicas1.compareTo(currentActiveReplicas2);
+ // Higher priority for the partition with fewer active replicas
+ int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
+ int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+ return Integer.compare(currentActiveReplicas1, currentActiveReplicas2);
}
-
- // Higher priority for the partition, which has less number of active replicas
- Integer idealStateMatched1 = getIdealStateMatched(p1);
- Integer idealStateMatched2 = getIdealStateMatched(p2);
-
- return idealStateMatched1.compareTo(idealStateMatched2);
+ // Higher priority for the partition with fewer replicas with states matching with IdealState
+ int idealStateMatched1 = getIdealStateMatched(p1);
+ int idealStateMatched2 = getIdealStateMatched(p2);
+ return Integer.compare(idealStateMatched1, idealStateMatched2);
}
- private Integer getMissTopStateIndex(Partition partition) {
- // 0 if no replica in top-state, 1 if it has at least one replica in top-state.
- if (!_currentStateMap.containsKey(partition) || !_currentStateMap.get(partition).values()
- .contains(_topState)) {
+ private int getMissTopStateIndex(Partition partition) {
+ // 0 if no replicas in top-state, 1 if it has at least one replica in top-state.
+ if (!_currentStateMap.containsKey(partition)
+ || !_currentStateMap.get(partition).values().contains(_topState)) {
return 0;
}
return 1;
}
- private Integer getCurrentActiveReplicas(Partition partition) {
- Integer currentActiveReplicas = 0;
+ private int getCurrentActiveReplicas(Partition partition) {
+ int currentActiveReplicas = 0;
if (!_currentStateMap.containsKey(partition)) {
return currentActiveReplicas;
}
-
// Initialize state -> number of this state map
- Map<String, Integer> stateCountMap = new HashMap<String, Integer>();
+ Map<String, Integer> stateCountMap = new HashMap<>();
for (String state : _bestPossibleMap.get(partition).values()) {
if (!stateCountMap.containsKey(state)) {
stateCountMap.put(state, 0);
}
stateCountMap.put(state, stateCountMap.get(state) + 1);
}
-
// Search the state map
for (String state : _currentStateMap.get(partition).values()) {
if (stateCountMap.containsKey(state) && stateCountMap.get(state) > 0) {
@@ -665,16 +765,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
stateCountMap.put(state, stateCountMap.get(state) - 1);
}
}
-
return currentActiveReplicas;
}
- private Integer getIdealStateMatched(Partition partition) {
- Integer matchedState = 0;
+ private int getIdealStateMatched(Partition partition) {
+ int matchedState = 0;
if (!_currentStateMap.containsKey(partition)) {
return matchedState;
}
-
for (String instance : _bestPossibleMap.get(partition).keySet()) {
if (_bestPossibleMap.get(partition).get(instance)
.equals(_currentStateMap.get(partition).get(instance))) {
@@ -684,4 +782,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
return matchedState;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/323fbd04/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
index cf1e10d..4a2baa7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -1,5 +1,24 @@
package org.apache.helix.controller.stages;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -11,31 +30,28 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Output for IntermediateStateCalStage.
+ * StateTransitionThrottleController is used to compute IntermediateState; it caches allowed
+ * transition counts to see if any state transitions depending on the rebalance type must be held
+ * off.
*/
class StateTransitionThrottleController {
private static Logger logger = LoggerFactory.getLogger(StateTransitionThrottleController.class);
// pending allowed transition counts in the cluster level for recovery and load balance
- Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster;
+ private Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster;
// pending allowed transition counts for each instance and resource
- Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
- _pendingTransitionAllowedPerInstance;
- Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
- _pendingTransitionAllowedPerResource;
+ private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerInstance;
+ private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerResource;
private boolean _throttleEnabled = false;
public StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig,
Set<String> liveInstances) {
super();
- _pendingTransitionAllowedInCluster =
- new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>();
- _pendingTransitionAllowedPerInstance =
- new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
- _pendingTransitionAllowedPerResource =
- new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
+ _pendingTransitionAllowedInCluster = new HashMap<>();
+ _pendingTransitionAllowedPerInstance = new HashMap<>();
+ _pendingTransitionAllowedPerResource = new HashMap<>();
if (clusterConfig == null) {
logger.warn("Cluster config is not found, no throttle config set!");
@@ -52,103 +68,112 @@ class StateTransitionThrottleController {
for (StateTransitionThrottleConfig config : throttleConfigs) {
switch (config.getThrottleScope()) {
- case CLUSTER:
- _pendingTransitionAllowedInCluster
- .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
- _throttleEnabled = true;
- break;
- case RESOURCE:
- for (String resource : resources) {
- if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
- _pendingTransitionAllowedPerResource
- .put(resource, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+ case CLUSTER:
+ _pendingTransitionAllowedInCluster.put(config.getRebalanceType(),
+ config.getMaxPartitionInTransition());
+ _throttleEnabled = true;
+ break;
+ case RESOURCE:
+ for (String resource : resources) {
+ if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
+ _pendingTransitionAllowedPerResource.put(resource,
+ new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+ }
+ _pendingTransitionAllowedPerResource.get(resource).put(config.getRebalanceType(),
+ config.getMaxPartitionInTransition());
}
- _pendingTransitionAllowedPerResource.get(resource)
- .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
- }
- _throttleEnabled = true;
- break;
- case INSTANCE:
- for (String instance : liveInstances) {
- if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
- _pendingTransitionAllowedPerInstance
- .put(instance, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+ _throttleEnabled = true;
+ break;
+ case INSTANCE:
+ for (String instance : liveInstances) {
+ if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
+ _pendingTransitionAllowedPerInstance.put(instance,
+ new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+ }
+ _pendingTransitionAllowedPerInstance.get(instance).put(config.getRebalanceType(),
+ config.getMaxPartitionInTransition());
}
- _pendingTransitionAllowedPerInstance.get(instance)
- .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
- }
- _throttleEnabled = true;
- break;
+ _throttleEnabled = true;
+ break;
}
}
}
/**
- * Whether any throttle config enabled for this cluster.
- *
- * @return
+ * Returns the flag that indicates throttling is applied at any level (cluster, resource, and
+ * instance).
+ * @return true if throttling operation is present
*/
protected boolean isThrottleEnabled() {
return _throttleEnabled;
}
/**
- * Check if state transition on a partition should be throttled.
- *
- * @return true if it should be throttled, otherwise, false.
+ * Check if state transitions for a particular Rebalance type must be throttled. Assuming the
+ * "charging" already happened at this level, this method purely checks whether the throttle value
+ * has reached 0 or below.
+ * @return true if it should be throttled, otherwise, false
*/
- protected boolean throttleforCluster(
+ protected boolean shouldThrottleForCluster(
StateTransitionThrottleConfig.RebalanceType rebalanceType) {
- if (throttleForANYType(_pendingTransitionAllowedInCluster)) {
+ if (shouldThrottleForANYType(_pendingTransitionAllowedInCluster)) {
return true;
}
-
Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
- if (clusterThrottle != null) {
- if (clusterThrottle <= 0) {
- return true;
- }
- }
-
- return false;
+ return clusterThrottle != null && clusterThrottle <= 0;
}
- protected boolean throttleforResource(
+ /**
+ * Check if state transitions for a particular Rebalance type must be throttled at the resource
+ * level. Assuming the "charging" already happened at this level and the throttle value did not
+ * dip below 0 at the higher level, this method purely checks whether the throttle value has
+ * reached 0 or below.
+ * @return true if it should be throttled, otherwise, false
+ */
+ protected boolean shouldThrottleForResource(
StateTransitionThrottleConfig.RebalanceType rebalanceType, String resourceName) {
- if (throttleforCluster(rebalanceType)) {
+ if (shouldThrottleForCluster(rebalanceType)) {
return true;
}
-
Long resourceThrottle;
if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) {
resourceThrottle = _pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType);
- if (throttleForANYType(_pendingTransitionAllowedPerResource.get(resourceName)) || (
- resourceThrottle != null && resourceThrottle <= 0)) {
+ if (shouldThrottleForANYType(_pendingTransitionAllowedPerResource.get(resourceName))
+ || (resourceThrottle != null && resourceThrottle <= 0)) {
return true;
}
}
-
return false;
}
- protected boolean throttleForInstance(
+ /**
+ * Check if state transitions for a particular Rebalance type must be throttled at the resource
+ * level. Assuming the "charging" already happened at this level and the throttle value did not
+ * dip below 0 at the higher level, this method purely checks whether the throttle value has
+ * reached 0 or below.
+ * @return true if it should be throttled, otherwise, false
+ */
+ protected boolean shouldThrottleForInstance(
StateTransitionThrottleConfig.RebalanceType rebalanceType, String instanceName) {
- if (throttleforCluster(rebalanceType)) {
+ if (shouldThrottleForCluster(rebalanceType)) {
return true;
}
-
Long instanceThrottle;
if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) {
instanceThrottle = _pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType);
- if (throttleForANYType(_pendingTransitionAllowedPerInstance.get(instanceName)) || (
- instanceThrottle != null && instanceThrottle <= 0)) {
+ if (shouldThrottleForANYType(_pendingTransitionAllowedPerInstance.get(instanceName))
+ || (instanceThrottle != null && instanceThrottle <= 0)) {
return true;
}
}
-
return false;
}
+ /**
+ * "Charge" for a pending state for a particular Rebalance type by subtracting one pending state
+ * from number of total pending states allowed (set by user application).
+ * @param rebalanceType
+ */
protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType rebalanceType) {
if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) {
Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
@@ -159,6 +184,11 @@ class StateTransitionThrottleController {
}
}
+ /**
+ * "Charge" for a pending state for a particular Rebalance type by subtracting one pending state
+ * from number of total pending states allowed (set by user application).
+ * @param rebalanceType
+ */
protected void chargeResource(StateTransitionThrottleConfig.RebalanceType rebalanceType,
String resource) {
if (_pendingTransitionAllowedPerResource.containsKey(resource)
@@ -171,6 +201,11 @@ class StateTransitionThrottleController {
}
}
+ /**
+ * "Charge" for a pending state for a particular Rebalance type by subtracting one pending state
+ * from number of total pending states allowed (set by user application).
+ * @param rebalanceType
+ */
protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType rebalanceType,
String instance) {
if (_pendingTransitionAllowedPerInstance.containsKey(instance)
@@ -183,7 +218,14 @@ class StateTransitionThrottleController {
}
}
- private boolean throttleForANYType(
+ /**
+ * Check if state transitions must be throttled overall regardless of the rebalance type.
+ * Assuming the "charging" already happened, this method purely checks whether the throttle value
+ * has reached 0 or below.
+ * @param pendingTransitionAllowed
+ * @return true if it should be throttled, otherwise, false
+ */
+ private boolean shouldThrottleForANYType(
Map<StateTransitionThrottleConfig.RebalanceType, Long> pendingTransitionAllowed) {
if (pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY)) {
Long anyTypeThrottle =
@@ -195,16 +237,21 @@ class StateTransitionThrottleController {
return false;
}
+ /**
+ * "Charge" for a pending state regardless of the rebalance type by subtracting one pending state
+ * from number of total pending state from number of total pending states allowed (set by user
+ * application).
+ * @param pendingTransitionAllowed
+ */
private void chargeANYType(
Map<StateTransitionThrottleConfig.RebalanceType, Long> pendingTransitionAllowed) {
if (pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY)) {
Long anyTypeThrottle =
pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
if (anyTypeThrottle > 0) {
- pendingTransitionAllowed
- .put(StateTransitionThrottleConfig.RebalanceType.ANY, anyTypeThrottle - 1);
+ pendingTransitionAllowed.put(StateTransitionThrottleConfig.RebalanceType.ANY,
+ anyTypeThrottle - 1);
}
}
}
-}
-
+}
\ No newline at end of file
|