helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/2] helix git commit: [HELIX-704] Refactor IntermediateState and throttling code
Date Tue, 26 Jun 2018 22:10:37 GMT
[HELIX-704] Refactor IntermediateState and throttling code

Refactor IntermediateState and throttling code with comments.

Changelist:
1. Add clear comments/JavaDoc/log messages
2. Rename confusing variable names and typos
3. A few small micro-optimizations

There is no change in the code logic.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/323fbd04
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/323fbd04
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/323fbd04

Branch: refs/heads/master
Commit: 323fbd049ed84a17ca8a2a00019bf76d51f5346e
Parents: 0d77cba
Author: Hunter Lee <narendly@gmail.com>
Authored: Mon Jun 25 14:55:07 2018 -0700
Committer: Hunter Lee <narendly@gmail.com>
Committed: Mon Jun 25 14:55:42 2018 -0700

----------------------------------------------------------------------
 .../config/StateTransitionThrottleConfig.java   |  27 +-
 .../stages/IntermediateStateCalcStage.java      | 606 +++++++++++--------
 .../StateTransitionThrottleController.java      | 187 +++---
 3 files changed, 481 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/323fbd04/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index c927a68..4955b86 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -48,7 +48,7 @@ public class StateTransitionThrottleConfig {
   public enum RebalanceType {
     LOAD_BALANCE,
     RECOVERY_BALANCE,
-    ANY,
+    ANY, // A type used for general throttling (to account for all types of rebalance)
     NONE
   }
 
@@ -56,8 +56,8 @@ public class StateTransitionThrottleConfig {
   ThrottleScope _throttleScope;
   Long _maxPartitionInTransition;
 
-  public StateTransitionThrottleConfig(RebalanceType rebalanceType,
-      ThrottleScope throttleScope, long maxPartitionInTransition) {
+  public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope,
+      long maxPartitionInTransition) {
     _rebalanceType = rebalanceType;
     _throttleScope = throttleScope;
     _maxPartitionInTransition = maxPartitionInTransition;
@@ -79,11 +79,10 @@ public class StateTransitionThrottleConfig {
 
   /**
    * Generate the JSON String for StateTransitionThrottleConfig.
-   *
    * @return Json String for this config.
    */
   public String toJSON() {
-    Map<String, String> configMap = new HashMap<String, String>();
+    Map<String, String> configMap = new HashMap<>();
     configMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
     configMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
     configMap.put(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name(),
@@ -94,7 +93,7 @@ public class StateTransitionThrottleConfig {
       ObjectWriter objectWriter = OBJECT_MAPPER.writer();
       jsonStr = objectWriter.writeValueAsString(configMap);
     } catch (IOException e) {
-      logger.error("Failed to convert config map to JSON object! " + configMap);
+      logger.error("Failed to convert config map to JSON object! {}", configMap);
     }
 
     return jsonStr;
@@ -102,9 +101,9 @@ public class StateTransitionThrottleConfig {
 
   /**
    * Instantiate a throttle config from a config JSON string.
-   *
    * @param configJsonStr
-   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid
+   *         StateTransitionThrottleConfig.
    */
   public static StateTransitionThrottleConfig fromJSON(String configJsonStr) {
     StateTransitionThrottleConfig throttleConfig = null;
@@ -113,23 +112,21 @@ public class StateTransitionThrottleConfig {
       Map<String, String> configsMap = objectReader.readValue(configJsonStr);
       throttleConfig = fromConfigMap(configsMap);
     } catch (IOException e) {
-      logger.error("Failed to convert JSON string to config map! " + configJsonStr);
+      logger.error("Failed to convert JSON string to config map! {}", configJsonStr);
     }
 
     return throttleConfig;
   }
 
   /**
-   * Instantiate a throttle config from a config map
-   *
+   * Instantiate a throttle config from a config map.
    * @param configsMap
-   *
    * @return StateTransitionThrottleConfig or null if the given configs map is not a valid
-   * StateTransitionThrottleConfig.
+   *         StateTransitionThrottleConfig.
    */
   public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) {
-    if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) || !configsMap
-        .containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
+    if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name())
+        || !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
       // not a valid StateTransitionThrottleConfig
       return null;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/323fbd04/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 166cc63..ba5a29a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -35,17 +35,16 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 /**
- * For partition compute the Intermediate State (instance,state) pair based on the BestPossible
- * State and Current State, with all constraints applied (such as state transition throttling).
+ * For partition compute the Intermediate State (instance,state) pair based on the BestPossibleState
+ * and CurrentState, with all constraints applied (such as state transition throttling).
  */
 public class IntermediateStateCalcStage extends AbstractBaseStage {
-  private static final Logger logger = LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
+  private static final Logger logger =
+      LoggerFactory.getLogger(IntermediateStateCalcStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.name());
-
+    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
     BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
@@ -53,15 +52,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
     if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
         || cache == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires CURRENT_STATE|BEST_POSSIBLE_STATE|RESOURCES|DataCache");
+      throw new StageException(String.format("Missing attributes in event: %s. "
+          + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
+          event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
     }
 
     IntermediateStateOutput intermediateStateOutput =
         compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput);
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput);
 
-    // Check whether any instance in the cluster could be assigned more partitions than allowed, if yes, pause the rebalancer.
+    // Make sure no instance has more replicas/partitions assigned than maxPartitionPerInstance. If
+    // it does, pause the rebalance and put the cluster on maintenance mode
     int maxPartitionPerInstance = cache.getClusterConfig().getMaxPartitionsPerInstance();
     if (maxPartitionPerInstance > 0) {
       validateMaxPartitionsPerInstance(event, cache, intermediateStateOutput,
@@ -69,76 +70,93 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
   }
 
+  /**
+   * Go through each resource, and based on BestPossibleState and CurrentState, compute
+   * IntermediateState as close to BestPossibleState while maintaining throttling constraints (for
+   * example, ensure that the number of possible pending state transitions does NOT go over the set
+   * threshold).
+   * @param event
+   * @param resourceMap
+   * @param currentStateOutput
+   * @param bestPossibleStateOutput
+   * @return
+   */
   private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
       CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
-    // for each resource
-    // get the best possible state and current state
-    // try to bring immediate state close to best possible state until
-    // the possible pending state transition numbers reach the set throttle number.
     IntermediateStateOutput output = new IntermediateStateOutput();
     ClusterDataCache dataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
 
-    StateTransitionThrottleController throttleController =
-        new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(),
-            dataCache.getLiveInstances().keySet());
+    StateTransitionThrottleController throttleController = new StateTransitionThrottleController(
+        resourceMap.keySet(), dataCache.getClusterConfig(), dataCache.getLiveInstances().keySet());
 
-    // Resource level prioritization with numerical sortable field.
-    // If no value has been set, it will be treated as lowest priority.
-    List<ResourcePriority> prioritizedResourceList = new ArrayList<ResourcePriority>();
+    // Resource level prioritization based on the numerical (sortable) priority field.
+    // If the resource priority field is null/not set, the resource will be treated as lowest
+    // priority.
+    List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
     for (String resourceName : resourceMap.keySet()) {
       prioritizedResourceList.add(new ResourcePriority(resourceName, Integer.MIN_VALUE));
     }
-    // Not have resource level prioritization if user did not set the field name
+    // If resourcePriorityField is null at the cluster level, all resources will be considered equal
+    // in priority by keeping all priorities at MIN_VALUE
     if (dataCache.getClusterConfig().getResourcePriorityField() != null) {
       String priorityField = dataCache.getClusterConfig().getResourcePriorityField();
-
       for (ResourcePriority resourcePriority : prioritizedResourceList) {
         String resourceName = resourcePriority.getResourceName();
 
         // Will take the priority from ResourceConfig first
         // If ResourceConfig does not exist or does not have this field.
-        // Try to fetch it from ideal state. Otherwise will treated as lowest priority
+        // Try to load it from the resource's IdealState. Otherwise, keep it at the lowest priority
         if (dataCache.getResourceConfig(resourceName) != null
             && dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField) != null) {
           resourcePriority.setPriority(
               dataCache.getResourceConfig(resourceName).getSimpleConfig(priorityField));
-        } else if (dataCache.getIdealState(resourceName) != null
-            && dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField)
-            != null) {
-
+        } else if (dataCache.getIdealState(resourceName) != null && dataCache
+            .getIdealState(resourceName).getRecord().getSimpleField(priorityField) != null) {
           resourcePriority.setPriority(
               dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
         }
       }
-
-      Collections.sort(prioritizedResourceList, new ResourcePriortiyComparator());
+      Collections.sort(prioritizedResourceList, new ResourcePriorityComparator());
     }
 
-    // Update cluster status monitor mbean
-    ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    // Re-load ClusterStatusMonitor MBean
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
 
+    // Priority is applied in assignment computation because higher priority by looping in order of
+    // decreasing priority
     for (ResourcePriority resourcePriority : prioritizedResourceList) {
       String resourceName = resourcePriority.getResourceName();
       Resource resource = resourceMap.get(resourceName);
       IdealState idealState = dataCache.getIdealState(resourceName);
-
       if (idealState == null) {
-        // if ideal state is deleted, use an empty one
-        logger.info("resource:" + resourceName + " does not exist anymore");
+        // If IdealState is null, use an empty one
+        logger.info("IdealState for resource {} does not exist; resource may not exist anymore",
+            resourceName);
         idealState = new IdealState(resourceName);
         idealState.setStateModelDefRef(resource.getStateModelDefRef());
       }
-
-      PartitionStateMap intermediatePartitionStateMap =
-          computeIntermediatePartitionState(dataCache, clusterStatusMonitor, idealState,
-              resourceMap.get(resourceName), currentStateOutput,
-              bestPossibleStateOutput.getPartitionStateMap(resourceName),
-              bestPossibleStateOutput.getPreferenceLists(resourceName), throttleController);
+      PartitionStateMap intermediatePartitionStateMap = computeIntermediatePartitionState(dataCache,
+          clusterStatusMonitor, idealState, resourceMap.get(resourceName), currentStateOutput,
+          bestPossibleStateOutput.getPartitionStateMap(resourceName),
+          bestPossibleStateOutput.getPreferenceLists(resourceName), throttleController);
       output.setState(resourceName, intermediatePartitionStateMap);
     }
     return output;
   }
 
+  /**
+   * Go through every instance in the assignment and check that each instance does NOT have more
+   * replicas for partitions assigned to it than maxPartitionsPerInstance. If the assignment
+   * violates this, put the cluster on maintenance mode.
+   * This logic could be integrated with compute() for IntermediateState calculation but appended
+   * separately for visibility and testing. Additionally, performing validation after compute()
+   * ensures that we have a full intermediate state mapping complete prior to validation.
+   * @param event
+   * @param cache
+   * @param intermediateStateOutput
+   * @param maxPartitionPerInstance
+   */
   private void validateMaxPartitionsPerInstance(ClusterEvent event, ClusterDataCache cache,
       IntermediateStateOutput intermediateStateOutput, int maxPartitionPerInstance) {
     Map<String, PartitionStateMap> resourceStatesMap =
@@ -147,9 +165,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
     for (String resource : resourceStatesMap.keySet()) {
       IdealState idealState = cache.getIdealState(resource);
-      if (idealState != null && idealState.getStateModelDefRef()
-          .equals(BuiltInStateModelDefinitions.Task.name())) {
-        // ignore task here. Task has its own throttling logic
+      if (idealState != null
+          && idealState.getStateModelDefRef().equals(BuiltInStateModelDefinitions.Task.name())) {
+        // Ignore task here. Task has its own throttling logic
         continue;
       }
 
@@ -158,7 +176,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       for (Partition p : stateMaps.keySet()) {
         Map<String, String> stateMap = stateMaps.get(p);
         for (String instance : stateMap.keySet()) {
-          //ignore replica to be dropped.
+          // If this replica is in DROPPED state, do not count it in the partition count since it is
+          // to be dropped
           String state = stateMap.get(instance);
           if (state.equals(HelixDefinedState.DROPPED.name())) {
             continue;
@@ -167,18 +186,25 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           if (!instancePartitionCounts.containsKey(instance)) {
             instancePartitionCounts.put(instance, 0);
           }
-          int partitionCount = instancePartitionCounts.get(instance);
+          int partitionCount = instancePartitionCounts.get(instance); // Number of replicas (from
+          // different partitions) held
+          // in this instance
           partitionCount++;
           if (partitionCount > maxPartitionPerInstance) {
             HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
             String errMsg = String.format(
-                "Partition count to be assigned to instance %s is greater than %d. Stop rebalance and pause the cluster %s",
+                "Problem: according to this assignment, instance %s contains more "
+                    + "replicas/partitions than the maximum number allowed (%d). Pipeline will "
+                    + "stop the rebalance and pause the cluster %s",
                 instance, maxPartitionPerInstance, cache.getClusterName());
             if (manager != null) {
-              manager.getClusterManagmentTool()
-                  .enableMaintenanceMode(manager.getClusterName(), true, errMsg);
+              manager.getClusterManagmentTool().enableMaintenanceMode(manager.getClusterName(),
+                  true, errMsg);
             } else {
-              logger.error("Failed to pause cluster, HelixManager is not set!");
+              logger.error(
+                  "HelixManager is not set/null! Failed to pause this cluster/enable maintenance"
+                      + " mode due to an instance being assigned more replicas/partitions than "
+                      + "the limit.");
             }
             throw new HelixException(errMsg);
           }
@@ -188,28 +214,40 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
   }
 
+  /**
+   * Compute intermediate partition states for a prioritized resource.
+   * @param cache
+   * @param clusterStatusMonitor
+   * @param idealState
+   * @param resource
+   * @param currentStateOutput
+   * @param bestPossiblePartitionStateMap
+   * @param preferenceLists
+   * @param throttleController
+   * @return
+   */
   private PartitionStateMap computeIntermediatePartitionState(ClusterDataCache cache,
       ClusterStatusMonitor clusterStatusMonitor, IdealState idealState, Resource resource,
       CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap,
       Map<String, List<String>> preferenceLists,
       StateTransitionThrottleController throttleController) {
     String resourceName = resource.getResourceName();
-    logger.debug("Processing resource:" + resourceName);
+    logger.debug("Processing resource: {}", resourceName);
 
-    if (!throttleController.isThrottleEnabled() || !IdealState.RebalanceMode.FULL_AUTO
-        .equals(idealState.getRebalanceMode()) || cache.isTaskCache()) {
-      // We only apply throttling on FULL-AUTO now.
+    // Throttling is applied only on FULL-AUTO mode
+    if (!throttleController.isThrottleEnabled()
+        || !IdealState.RebalanceMode.FULL_AUTO.equals(idealState.getRebalanceMode())
+        || cache.isTaskCache()) {
       return bestPossiblePartitionStateMap;
     }
 
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
-
     PartitionStateMap intermediatePartitionStateMap = new PartitionStateMap(resourceName);
 
     Set<Partition> partitionsNeedRecovery = new HashSet<>();
-    Set<Partition> partitionsNeedLoadbalance = new HashSet<>();
-    Set<Partition> partitionshaveErrorStateReplica =  new HashSet<>();
+    Set<Partition> partitionsNeedLoadBalance = new HashSet<>();
+    Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
     for (Partition partition : resource.getPartitions()) {
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -217,93 +255,98 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           bestPossiblePartitionStateMap.getPartitionMap(partition);
       List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
 
-      RebalanceType rebalanceType =
-          getRebalanceType(cache, bestPossibleMap, preferenceList, stateModelDef, currentStateMap,
-              idealState);
+      RebalanceType rebalanceType = getRebalanceType(cache, bestPossibleMap, preferenceList,
+          stateModelDef, currentStateMap, idealState);
 
-      // TODO refine getRebalanceType to return more accurate rebalance types.
-      // So following logic doesn't need to check for more details.
-      boolean rebalanceNeeded = false;
+      // TODO: refine getRebalanceType to return more accurate rebalance types. So the following
+      // logic doesn't need to check for more details.
+      boolean isRebalanceNeeded = false;
+
+      // Number of states required by StateModelDefinition are not satisfied, need recovery
       if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
-        // Check if any error exist
+        // Check whether partition is in ERROR state
         if (currentStateMap.values().contains(HelixDefinedState.ERROR.name())) {
-          partitionshaveErrorStateReplica.add(partition);
+          partitionsWithErrorStateReplica.add(partition);
         }
         // Check if recovery is needed for this partition
         if (!currentStateMap.equals(bestPossibleMap)) {
           partitionsNeedRecovery.add(partition);
-          rebalanceNeeded = true;
-        } // else, if currentState == bestPossibleState, no rebalance needed
+          isRebalanceNeeded = true;
+        }
+        // Number of states required by StateModelDefinition are satisfied, but to achieve
+        // BestPossibleState, need load balance
       } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
-        partitionsNeedLoadbalance.add(partition);
-        rebalanceNeeded = true;
+        partitionsNeedLoadBalance.add(partition);
+        isRebalanceNeeded = true;
       }
-
-      if (!rebalanceNeeded) {
-        // no rebalance needed.
+      // Currently at BestPossibleState, no further action necessary
+      if (!isRebalanceNeeded) {
         Map<String, String> intermediateMap = new HashMap<>(bestPossibleMap);
         intermediatePartitionStateMap.setState(partition, intermediateMap);
       }
     }
 
     if (!partitionsNeedRecovery.isEmpty()) {
-      logger.info(
-          "recovery balance needed for " + resourceName + " partitions: " + partitionsNeedRecovery);
+      logger.info("Recovery balance needed for {} partitions: {}", resourceName,
+          partitionsNeedRecovery);
     }
-    if (!partitionsNeedLoadbalance.isEmpty()) {
-      logger.info(
-          "load balance needed for " + resourceName + " partitions: " + partitionsNeedLoadbalance);
+    if (!partitionsNeedLoadBalance.isEmpty()) {
+      logger.info("Load balance needed for partitions: {}", resourceName,
+          partitionsNeedLoadBalance);
     }
-    if (!partitionshaveErrorStateReplica.isEmpty()) {
-      logger.info("partition currently has ERROR replica in " + resourceName + " partitions: "
-          + partitionshaveErrorStateReplica);
+    if (!partitionsWithErrorStateReplica.isEmpty()) {
+      logger.info("Partition currently has an ERROR replica in {} partitions: {}", resourceName,
+          partitionsWithErrorStateReplica);
     }
 
     chargePendingTransition(resource, currentStateOutput, throttleController,
-        partitionsNeedRecovery, partitionsNeedLoadbalance);
+        partitionsNeedRecovery, partitionsNeedLoadBalance);
 
-    // perform recovery rebalance
+    // Perform recovery balance
     Set<Partition> recoveryThrottledPartitions =
-    recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController,
-        intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput,
-        cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
-
-    Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadbalance;
-
-    long maxAllowedErrorPartitions = cache.getClusterConfig().getErrorPartitionThresholdForLoadBalance();
-    if (partitionsNeedRecovery.isEmpty() &&
-        (maxAllowedErrorPartitions < 0
-            || partitionshaveErrorStateReplica.size() <= maxAllowedErrorPartitions)) {
-      // perform load balance only if
-      //   1. no recovery operation to be scheduled.
-      //   2. error partition count is less than configured limitation.
-      loadbalanceThrottledPartitions =
-          loadRebalance(resource, currentStateOutput, bestPossiblePartitionStateMap,
-              throttleController, intermediatePartitionStateMap, partitionsNeedLoadbalance,
-              currentStateOutput.getCurrentStateMap(resourceName));
+        recoveryRebalance(resource, bestPossiblePartitionStateMap, throttleController,
+            intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput,
+            cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
+
+    // Perform load balance
+    Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadBalance;
+
+    long maxAllowedErrorPartitions =
+        cache.getClusterConfig().getErrorPartitionThresholdForLoadBalance();
+
+    // TODO: Logic here needs change - when there is an error partition, load should still happen
+
+    // Perform load balance only if
+    // 1. no recovery operation to be scheduled
+    // 2. error partition count is less than configured limitation
+    if (partitionsNeedRecovery.isEmpty() && (maxAllowedErrorPartitions < 0
+        || partitionsWithErrorStateReplica.size() <= maxAllowedErrorPartitions)) {
+      loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput,
+          bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap,
+          partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName));
     } else {
-      // skip load balance, use current state mapping
-      for (Partition p : partitionsNeedLoadbalance) {
+      // Skip load balance and current states just become intermediate states
+      for (Partition partition : partitionsNeedLoadBalance) {
         Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, p);
-        intermediatePartitionStateMap.setState(p, currentStateMap);
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+        intermediatePartitionStateMap.setState(partition, currentStateMap);
       }
     }
 
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(),
-          partitionsNeedLoadbalance.size(), recoveryThrottledPartitions.size(),
+          partitionsNeedLoadBalance.size(), recoveryThrottledPartitions.size(),
           loadbalanceThrottledPartitions.size());
     }
 
     if (logger.isDebugEnabled()) {
-      logParitionMapState(resourceName, new HashSet<>(resource.getPartitions()),
-          partitionsNeedRecovery, recoveryThrottledPartitions, partitionsNeedLoadbalance,
+      logPartitionMapState(resourceName, new HashSet<>(resource.getPartitions()),
+          partitionsNeedRecovery, recoveryThrottledPartitions, partitionsNeedLoadBalance,
           loadbalanceThrottledPartitions, currentStateOutput, bestPossiblePartitionStateMap,
           intermediatePartitionStateMap);
     }
 
-    logger.debug("End processing resource:" + resourceName);
+    logger.debug("End processing resource: {}", resourceName);
     return intermediatePartitionStateMap;
   }
 
@@ -317,8 +360,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Maps instance to its current state
       Map<String, String> currentStateMap =
           currentStateOutput.getCurrentStateMap(resourceName, partition);
+      // Maps instance to its pending (next) state
       Map<String, String> pendingMap =
           currentStateOutput.getPendingStateMap(resourceName, partition);
 
@@ -334,11 +379,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
         throttleController.chargeResource(rebalanceType, resourceName);
 
         // charge each instance.
-        for (String ins : pendingMap.keySet()) {
-          String currentState = currentStateMap.get(ins);
-          String pendingState = pendingMap.get(ins);
+        for (String instance : pendingMap.keySet()) {
+          String currentState = currentStateMap.get(instance);
+          String pendingState = pendingMap.get(instance);
           if (pendingState != null && !pendingState.equals(currentState)) {
-            throttleController.chargeInstance(rebalanceType, ins);
+            throttleController.chargeInstance(rebalanceType, instance);
           }
         }
       }
@@ -346,137 +391,173 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
-   *  Perform any recovery balance if needed, fill intermediatePartitionStateMap
-   *  if recover rebalance is needed.
-   *  return the partitions needs recoveryRebalance but get throttled
+   * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
+   * each partition, throttle state transitions if needed. Also populate
+   * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
+   * CurrentState (if throttled).
+   * @param resource
+   * @param bestPossiblePartitionStateMap
+   * @param throttleController
+   * @param intermediatePartitionStateMap
+   * @param partitionsNeedRecovery
+   * @param currentStateOutput
+   * @param topState
+   * @return a set of partitions that need recovery but did not get recovered due to throttling
    */
-  private Set<Partition> recoveryRebalance(Resource resource, PartitionStateMap bestPossiblePartitionStateMap,
+  private Set<Partition> recoveryRebalance(Resource resource,
+      PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery,
       CurrentStateOutput currentStateOutput, String topState) {
-    Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<Partition>();
+    String resourceName = resource.getResourceName();
+    Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
 
+    // Maps Partition -> Instance -> State
     Map<Partition, Map<String, String>> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resource.getResourceName());
-    List<Partition> partitionsNeedRecoveryPrioritized =
-        new ArrayList<Partition>(partitionsNeedRecovery);
+        currentStateOutput.getCurrentStateMap(resourceName);
+    List<Partition> partitionsNeedRecoveryPrioritized = new ArrayList<>(partitionsNeedRecovery);
 
-    // TODO : Due to currently using JAVA 1.6, the original order of partitions list is not
-    // determinable, sort the list by partition name and remove the code after bump to JAVA 1.8
+    // TODO: Remove this sort by partition name when Java 1.8 is used
+    // We want the result of the intermediate state calculation to be deterministic. We sort here by
+    // partition name to ensure that the order is consistent for inputs fed into
+    // PartitionPriorityComparator sort
     Collections.sort(partitionsNeedRecoveryPrioritized, new Comparator<Partition>() {
       @Override
-      public int compare(Partition o1, Partition o2) {
-        return o1.getPartitionName().compareTo(o2.getPartitionName());
+      public int compare(Partition partition1, Partition partition2) {
+        return partition1.getPartitionName().compareTo(partition2.getPartitionName());
       }
     });
+    Collections.sort(partitionsNeedRecoveryPrioritized, new PartitionPriorityComparator(
+        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, topState, true));
 
-    Collections.sort(partitionsNeedRecoveryPrioritized,
-        new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
-            currentStateMap, topState, true));
-
+    // For each partition, apply throttling if needed.
     for (Partition partition : partitionsNeedRecoveryPrioritized) {
-      throtteStateTransitions(throttleController, resource.getResourceName(), partition,
+      throttleStateTransitionsForPartition(throttleController, resourceName, partition,
           currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
     }
-
-    logger.info(String
-        .format("%s: needRecovery: %d, recoverybalanceThrottled: %d", resource.getResourceName(),
-            partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size()));
+    logger.info(String.format(
+        "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery but throttled (not recovered): %d",
+        resourceName, partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size()));
     return partitionRecoveryBalanceThrottled;
   }
 
-  /* return the partitions needs loadRebalance but get throttled */
+  /**
+   * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
+   * each partition, throttle state transitions if needed. Also populate
+   * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
+   * CurrentState (if throttled).
+   * @param resource
+   * @param currentStateOutput
+   * @param bestPossiblePartitionStateMap
+   * @param throttleController
+   * @param intermediatePartitionStateMap
+   * @param partitionsNeedLoadbalance
+   * @param currentStateMaps
+   * @return a set of partitions that need to be load-balanced but did not due to throttling
+   */
   private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance,
       Map<Partition, Map<String, String>> currentStateMaps) {
     String resourceName = resource.getResourceName();
-    Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>();
+    Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
 
     List<Partition> partitionsNeedLoadRebalancePrioritized =
         new ArrayList<>(partitionsNeedLoadbalance);
 
-    // TODO : Due to currently using JAVA 1.6, the original order of partitions list is not
-    // determinable, sort the list by partition name and remove the code after bump to JAVA 1.8
+    // TODO: Remove this sort by partition name when Java 1.8 is used
+    // We want the result of the intermediate state calculation to be deterministic. We sort here by
+    // partition name to ensure that the order is consistent for inputs fed into
+    // PartitionPriorityComparator sort
     Collections.sort(partitionsNeedLoadRebalancePrioritized, new Comparator<Partition>() {
       @Override
-      public int compare(Partition o1, Partition o2) {
-        return o1.getPartitionName().compareTo(o2.getPartitionName());
+      public int compare(Partition partition1, Partition partition2) {
+        return partition1.getPartitionName().compareTo(partition2.getPartitionName());
       }
     });
-
-    Collections.sort(partitionsNeedLoadRebalancePrioritized,
-        new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
-            currentStateMaps, "", false));
-
+    Collections.sort(partitionsNeedLoadRebalancePrioritized, new PartitionPriorityComparator(
+        bestPossiblePartitionStateMap.getStateMap(), currentStateMaps, "", false));
     for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
-      throtteStateTransitions(throttleController, resourceName, partition, currentStateOutput,
-          bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
+      throttleStateTransitionsForPartition(throttleController, resourceName, partition,
+          currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
     }
-
-    logger.info(String
-        .format("%s: loadbalanceNeeded: %d, loadbalanceThrottled: %d", resource.getResourceName(), partitionsNeedLoadbalance.size(),
-            partitionsLoadbalanceThrottled.size()));
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("recovery balance throttled for " + resourceName + " partitions: "
-          + partitionsLoadbalanceThrottled);
-    }
-
+    logger.info(String.format(
+        "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing"
+            + " load-balance but throttled (not load-balanced): %d",
+        resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size()));
     return partitionsLoadbalanceThrottled;
   }
 
-  private void throtteStateTransitions(StateTransitionThrottleController throttleController,
-      String resourceName, Partition partition, CurrentStateOutput currentStateOutput,
+  /**
+   * Check the status on throttling at every level (cluster, resource, instance) and set
+   * intermediatePartitionStateMap accordingly per partition.
+   * @param throttleController
+   * @param resourceName
+   * @param partition
+   * @param currentStateOutput
+   * @param bestPossiblePartitionStateMap
+   * @param partitionsThrottled
+   * @param intermediatePartitionStateMap
+   * @param rebalanceType
+   */
+  private void throttleStateTransitionsForPartition(
+      StateTransitionThrottleController throttleController, String resourceName,
+      Partition partition, CurrentStateOutput currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType) {
 
     Map<String, String> currentStateMap =
         currentStateOutput.getCurrentStateMap(resourceName, partition);
     Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<String>(currentStateMap.keySet());
+    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
     allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<String, String>();
+    Map<String, String> intermediateMap = new HashMap<>();
 
-    boolean throttled = false;
-    if (throttleController.throttleforResource(rebalanceType, resourceName)) {
-      throttled = true;
+    boolean hasReachedThrottlingLimit = false;
+    if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
+      hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        logger
-            .debug("Throttled on resource for " + resourceName + " " + partition.getPartitionName());
+        logger.debug("Throttled on partition: {} in resource: {}", partition.getPartitionName(),
+            resourceName);
       }
     } else {
-      // throttle if any of the instance can not handle the state transition
-      for (String ins : allInstances) {
-        String currentState = currentStateMap.get(ins);
-        String bestPossibleState = bestPossibleMap.get(ins);
+      // throttle if any of the instances are not able to accept state transitions
+      for (String instance : allInstances) {
+        String currentState = currentStateMap.get(instance);
+        String bestPossibleState = bestPossibleMap.get(instance);
         if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
-          if (throttleController.throttleForInstance(rebalanceType, ins)) {
-            throttled = true;
+          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
+            hasReachedThrottlingLimit = true;
             if (logger.isDebugEnabled()) {
-              logger.debug("Throttled because instance " + ins + " for " + resourceName + " " + partition
-                      .getPartitionName());
+              logger.debug(
+                  "Throttled because of instance: {} for partition: {} in resource: {}" + instance,
+                  partition.getPartitionName(), resourceName);
             }
+            break;
           }
         }
       }
     }
-
-    if (!throttled) {
+    if (!hasReachedThrottlingLimit) {
+      // This implies that there is room for more pending states. Find
+      // instances with a replica whose current state is different from BestPossibleState and
+      // "charge" for it, and bestPossibleStates will become intermediate states
       intermediateMap.putAll(bestPossibleMap);
-      for (String ins : allInstances) {
-        String currentState = currentStateMap.get(ins);
-        String bestPossibleState = bestPossibleMap.get(ins);
+      for (String instance : allInstances) {
+        String currentState = currentStateMap.get(instance);
+        String bestPossibleState = bestPossibleMap.get(instance);
         if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
-          throttleController.chargeInstance(rebalanceType, ins);
+          throttleController.chargeInstance(rebalanceType, instance);
         }
       }
       throttleController.chargeCluster(rebalanceType);
       throttleController.chargeResource(rebalanceType, resourceName);
     } else {
+      // No more room for more pending states; current states will just become intermediate states
+      // Add this partition to a set of throttled partitions
       intermediateMap.putAll(currentStateMap);
       partitionsThrottled.add(partition);
     }
@@ -484,17 +565,19 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
-   * Given preferenceList, bestPossibleState and currentState, determine which type of rebalance is
-   * needed to fit the idea states defined by the state model definition.
-   *
-   * @return rebalance type needed to bring the replicas to idea states
-   *         RECOVERY_BALANCE - required states is not available through all replicas
-   *         NONE - current state matches the idea state
-   *         LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the allocation
+   * For a partiton, given its preferenceList, bestPossibleState, and currentState, determine which
+   * type of rebalance is needed to model IdealState's states defined by the state model definition.
+   * @return RebalanceType needed to bring the replicas to idea states
+   *         RECOVERY_BALANCE - not all required states (replicas) are available through all
+   *         replicas
+   *         NONE - current state matches the ideal state
+   *         LOAD_BALANCE - although all replicas required exist, Helix needs to optimize the
+   *         allocation
    */
   private RebalanceType getRebalanceType(ClusterDataCache cache,
       Map<String, String> bestPossibleMap, List<String> preferenceList,
-      StateModelDefinition stateModelDef, Map<String, String> currentStateMap, IdealState idealState) {
+      StateModelDefinition stateModelDef, Map<String, String> currentStateMap,
+      IdealState idealState) {
     if (preferenceList == null) {
       preferenceList = Collections.emptyList();
     }
@@ -503,80 +586,95 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     Set<String> activeList = new HashSet<>(preferenceList);
     activeList.retainAll(cache.getEnabledLiveInstances());
 
-    // Check current states against state model define. If doesn't match, need recovery.
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
     LinkedHashMap<String, Integer> expectedStateCountMap =
-        stateModelDef.getStateCountMap(activeList.size(), replica);
-    Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap);
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+    Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap); // Current
+    // counts
 
+    // Go through each state and compare counts
     for (String state : expectedStateCountMap.keySet()) {
       Integer expectedCount = expectedStateCountMap.get(state);
       Integer currentCount = currentStateCounts.get(state);
-      expectedCount = expectedCount == null? 0 : expectedCount;
-      currentCount = currentCount == null? 0 : currentCount;
+      expectedCount = expectedCount == null ? 0 : expectedCount;
+      currentCount = currentCount == null ? 0 : currentCount;
 
+      // If counts do not match up, this partition requires recovery
       if (currentCount < expectedCount) {
-        if (!state.equals(HelixDefinedState.DROPPED.name()) &&
-            !state.equals(HelixDefinedState.ERROR.name()) &&
-            !state.equals(stateModelDef.getInitialState())) {
+        // Recovery is not needed in cases where this partition just started, was dropped, or is in
+        // error
+        if (!state.equals(HelixDefinedState.DROPPED.name())
+            && !state.equals(HelixDefinedState.ERROR.name())
+            && !state.equals(stateModelDef.getInitialState())) {
           return RebalanceType.RECOVERY_BALANCE;
         }
       }
     }
-    // No recovery needed, all expected replicas exist.
-    // Check if the calculated best possible states matches current states
+    // No recovery needed, all expected replicas exist
+    // Check if this partition is actually in the BestPossibleState
     if (currentStateMap.equals(bestPossibleMap)) {
-      return RebalanceType.NONE;
+      return RebalanceType.NONE; // No further action required
     } else {
-      // All other cases is categorized as load balance change
-      return RebalanceType.LOAD_BALANCE;
+      return RebalanceType.LOAD_BALANCE; // Required state counts are satisfied, but in order to
+      // achieve BestPossibleState, load balance may be required
+      // to shift replicas around
     }
   }
 
-  private void logParitionMapState(String resource, Set<Partition> allPartitions,
+  /**
+   * Log rebalancer metadata for debugging purposes.
+   * @param resource
+   * @param allPartitions
+   * @param recoveryPartitions
+   * @param recoveryThrottledPartitions
+   * @param loadbalancePartitions
+   * @param loadbalanceThrottledPartitions
+   * @param currentStateOutput
+   * @param bestPossibleStateMap
+   * @param intermediateStateMap
+   */
+  private void logPartitionMapState(String resource, Set<Partition> allPartitions,
       Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions,
       Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions,
-      CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossibleStateMap,
+      CurrentStateOutput currentStateOutput, PartitionStateMap bestPossibleStateMap,
       PartitionStateMap intermediateStateMap) {
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Partitions need recovery: " + recoveryPartitions
-          + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions);
-      logger.debug("Partitions need loadbalance: " + loadbalancePartitions
-          + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions);
+      logger.debug("Partitions need recovery: {}\nPartitions get throttled on recovery: {}",
+          recoveryPartitions, recoveryThrottledPartitions);
+      logger.debug("Partitions need loadbalance: {}\nPartitions get throttled on load-balance: ",
+          loadbalancePartitions, loadbalanceThrottledPartitions);
     }
 
     for (Partition partition : allPartitions) {
       if (logger.isDebugEnabled()) {
-        logger.debug(
-            partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition));
-        logger.debug(partition + ": Current State: " + currentStateOutput
-            .getCurrentStateMap(resource, partition));
-        logger.debug(partition + ": Pending state: " + currentStateOutput
-            .getPendingMessageMap(resource, partition));
-        logger.debug(
-            partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition));
+        logger.debug(partition + ": Best possible map: {}",
+            bestPossibleStateMap.getPartitionMap(partition));
+        logger.debug(partition + ": Current State: {}",
+            currentStateOutput.getCurrentStateMap(resource, partition));
+        logger.debug(partition + ": Pending state: {}",
+            currentStateOutput.getPendingMessageMap(resource, partition));
+        logger.debug(partition + ": Intermediate state: {}",
+            intermediateStateMap.getPartitionMap(partition));
       }
     }
   }
 
-  private static class ResourcePriortiyComparator implements Comparator<ResourcePriority> {
-    @Override public int compare(ResourcePriority r1, ResourcePriority r2) {
-      return r2.compareTo(r1);
-    }
-  }
-
+  /**
+   * POJO that maps resource name to its priority represented by an integer.
+   */
   private static class ResourcePriority {
     private String _resourceName;
-    private Integer _priority;
+    private int _priority;
 
-    public ResourcePriority(String resourceName, Integer priority) {
+    ResourcePriority(String resourceName, Integer priority) {
       _resourceName = resourceName;
       _priority = priority;
     }
 
     public int compareTo(ResourcePriority resourcePriority) {
-      return this._priority.compareTo(resourcePriority._priority);
+      return Integer.compare(_priority, resourcePriority._priority);
     }
 
     public String getResourceName() {
@@ -593,16 +691,23 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  //  Compare partitions according following standard:
-  //  1) Partition without top state always is the highest priority.
-  //  2) For partition with top-state, the more number of active replica it has, the less priority.
+  private static class ResourcePriorityComparator implements Comparator<ResourcePriority> {
+    @Override
+    public int compare(ResourcePriority priority1, ResourcePriority priority2) {
+      return priority2.compareTo(priority1);
+    }
+  }
+
+  // Compare partitions according following standard:
+  // 1) Partition without top state always is the highest priority.
+  // 2) For partition with top-state, the more number of active replica it has, the less priority.
   private class PartitionPriorityComparator implements Comparator<Partition> {
     private Map<Partition, Map<String, String>> _bestPossibleMap;
     private Map<Partition, Map<String, String>> _currentStateMap;
     private String _topState;
     private boolean _recoveryRebalance;
 
-    public PartitionPriorityComparator(Map<Partition, Map<String, String>> bestPossibleMap,
+    PartitionPriorityComparator(Map<Partition, Map<String, String>> bestPossibleMap,
         Map<Partition, Map<String, String>> currentStateMap, String topState,
         boolean recoveryRebalance) {
       _bestPossibleMap = bestPossibleMap;
@@ -614,50 +719,45 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     @Override
     public int compare(Partition p1, Partition p2) {
       if (_recoveryRebalance) {
-        Integer missTopState1 = getMissTopStateIndex(p1);
-        Integer missTopState2 = getMissTopStateIndex(p2);
-
+        int missTopState1 = getMissTopStateIndex(p1);
+        int missTopState2 = getMissTopStateIndex(p2);
         // Highest priority for the partition without top state
-        if (!missTopState1.equals(missTopState2)) {
-          return missTopState1.compareTo(missTopState2);
+        if (missTopState1 != missTopState2) {
+          return Integer.compare(missTopState1, missTopState2);
         }
-
-        Integer currentActiveReplicas1 = getCurrentActiveReplicas(p1);
-        Integer currentActiveReplicas2 = getCurrentActiveReplicas(p2);
-        return currentActiveReplicas1.compareTo(currentActiveReplicas2);
+        // Higher priority for the partition with fewer active replicas
+        int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
+        int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+        return Integer.compare(currentActiveReplicas1, currentActiveReplicas2);
       }
-
-      // Higher priority for the partition, which has less number of active replicas
-      Integer idealStateMatched1 = getIdealStateMatched(p1);
-      Integer idealStateMatched2 = getIdealStateMatched(p2);
-
-      return idealStateMatched1.compareTo(idealStateMatched2);
+      // Higher priority for the partition with fewer replicas with states matching with IdealState
+      int idealStateMatched1 = getIdealStateMatched(p1);
+      int idealStateMatched2 = getIdealStateMatched(p2);
+      return Integer.compare(idealStateMatched1, idealStateMatched2);
     }
 
-    private Integer getMissTopStateIndex(Partition partition) {
-      // 0 if no replica in top-state, 1 if it has at least one replica in top-state.
-      if (!_currentStateMap.containsKey(partition) || !_currentStateMap.get(partition).values()
-          .contains(_topState)) {
+    private int getMissTopStateIndex(Partition partition) {
+      // 0 if no replicas in top-state, 1 if it has at least one replica in top-state.
+      if (!_currentStateMap.containsKey(partition)
+          || !_currentStateMap.get(partition).values().contains(_topState)) {
         return 0;
       }
       return 1;
     }
 
-    private Integer getCurrentActiveReplicas(Partition partition) {
-      Integer currentActiveReplicas = 0;
+    private int getCurrentActiveReplicas(Partition partition) {
+      int currentActiveReplicas = 0;
       if (!_currentStateMap.containsKey(partition)) {
         return currentActiveReplicas;
       }
-
       // Initialize state -> number of this state map
-      Map<String, Integer> stateCountMap = new HashMap<String, Integer>();
+      Map<String, Integer> stateCountMap = new HashMap<>();
       for (String state : _bestPossibleMap.get(partition).values()) {
         if (!stateCountMap.containsKey(state)) {
           stateCountMap.put(state, 0);
         }
         stateCountMap.put(state, stateCountMap.get(state) + 1);
       }
-
       // Search the state map
       for (String state : _currentStateMap.get(partition).values()) {
         if (stateCountMap.containsKey(state) && stateCountMap.get(state) > 0) {
@@ -665,16 +765,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           stateCountMap.put(state, stateCountMap.get(state) - 1);
         }
       }
-
       return currentActiveReplicas;
     }
 
-    private Integer getIdealStateMatched(Partition partition) {
-      Integer matchedState = 0;
+    private int getIdealStateMatched(Partition partition) {
+      int matchedState = 0;
       if (!_currentStateMap.containsKey(partition)) {
         return matchedState;
       }
-
       for (String instance : _bestPossibleMap.get(partition).keySet()) {
         if (_bestPossibleMap.get(partition).get(instance)
             .equals(_currentStateMap.get(partition).get(instance))) {
@@ -684,4 +782,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       return matchedState;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/323fbd04/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
index cf1e10d..4a2baa7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -1,5 +1,24 @@
 package org.apache.helix.controller.stages;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -11,31 +30,28 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Output for IntermediateStateCalStage.
+ * StateTransitionThrottleController is used to compute IntermediateState; it caches allowed
+ * transition counts to see if any state transitions depending on the rebalance type must be held
+ * off.
  */
 class StateTransitionThrottleController {
   private static Logger logger = LoggerFactory.getLogger(StateTransitionThrottleController.class);
 
   // pending allowed transition counts in the cluster level for recovery and load balance
-  Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster;
+  private Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster;
 
   // pending allowed transition counts for each instance and resource
-  Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
-      _pendingTransitionAllowedPerInstance;
-  Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
-      _pendingTransitionAllowedPerResource;
+  private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerInstance;
+  private Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> _pendingTransitionAllowedPerResource;
 
   private boolean _throttleEnabled = false;
 
   public StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig,
       Set<String> liveInstances) {
     super();
-    _pendingTransitionAllowedInCluster =
-        new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>();
-    _pendingTransitionAllowedPerInstance =
-        new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
-    _pendingTransitionAllowedPerResource =
-        new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
+    _pendingTransitionAllowedInCluster = new HashMap<>();
+    _pendingTransitionAllowedPerInstance = new HashMap<>();
+    _pendingTransitionAllowedPerResource = new HashMap<>();
 
     if (clusterConfig == null) {
       logger.warn("Cluster config is not found, no throttle config set!");
@@ -52,103 +68,112 @@ class StateTransitionThrottleController {
 
     for (StateTransitionThrottleConfig config : throttleConfigs) {
       switch (config.getThrottleScope()) {
-      case CLUSTER:
-        _pendingTransitionAllowedInCluster
-            .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
-        _throttleEnabled = true;
-        break;
-      case RESOURCE:
-        for (String resource : resources) {
-          if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
-            _pendingTransitionAllowedPerResource
-                .put(resource, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+        case CLUSTER:
+          _pendingTransitionAllowedInCluster.put(config.getRebalanceType(),
+              config.getMaxPartitionInTransition());
+          _throttleEnabled = true;
+          break;
+        case RESOURCE:
+          for (String resource : resources) {
+            if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
+              _pendingTransitionAllowedPerResource.put(resource,
+                  new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+            }
+            _pendingTransitionAllowedPerResource.get(resource).put(config.getRebalanceType(),
+                config.getMaxPartitionInTransition());
           }
-          _pendingTransitionAllowedPerResource.get(resource)
-              .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
-        }
-        _throttleEnabled = true;
-        break;
-      case INSTANCE:
-        for (String instance : liveInstances) {
-          if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
-            _pendingTransitionAllowedPerInstance
-                .put(instance, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+          _throttleEnabled = true;
+          break;
+        case INSTANCE:
+          for (String instance : liveInstances) {
+            if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
+              _pendingTransitionAllowedPerInstance.put(instance,
+                  new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+            }
+            _pendingTransitionAllowedPerInstance.get(instance).put(config.getRebalanceType(),
+                config.getMaxPartitionInTransition());
           }
-          _pendingTransitionAllowedPerInstance.get(instance)
-              .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
-        }
-        _throttleEnabled = true;
-        break;
+          _throttleEnabled = true;
+          break;
       }
     }
   }
 
   /**
-   * Whether any throttle config enabled for this cluster.
-   *
-   * @return
+   * Returns the flag that indicates throttling is applied at any level (cluster, resource, and
+   * instance).
+   * @return true if throttling operation is present
    */
   protected boolean isThrottleEnabled() {
     return _throttleEnabled;
   }
 
   /**
-   * Check if state transition on a partition should be throttled.
-   *
-   * @return true if it should be throttled, otherwise, false.
+   * Check if state transitions for a particular Rebalance type must be throttled. Assuming the
+   * "charging" already happened at this level, this method purely checks whether the throttle value
+   * has reached 0 or below.
+   * @return true if it should be throttled, otherwise, false
    */
-  protected boolean throttleforCluster(
+  protected boolean shouldThrottleForCluster(
       StateTransitionThrottleConfig.RebalanceType rebalanceType) {
-    if (throttleForANYType(_pendingTransitionAllowedInCluster)) {
+    if (shouldThrottleForANYType(_pendingTransitionAllowedInCluster)) {
       return true;
     }
-
     Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
-    if (clusterThrottle != null) {
-      if (clusterThrottle <= 0) {
-        return true;
-      }
-    }
-
-    return false;
+    return clusterThrottle != null && clusterThrottle <= 0;
   }
 
-  protected boolean throttleforResource(
+  /**
+   * Check if state transitions for a particular Rebalance type must be throttled at the resource
+   * level. Assuming the "charging" already happened at this level and the throttle value did not
+   * dip below 0 at the higher level, this method purely checks whether the throttle value has
+   * reached 0 or below.
+   * @return true if it should be throttled, otherwise, false
+   */
+  protected boolean shouldThrottleForResource(
       StateTransitionThrottleConfig.RebalanceType rebalanceType, String resourceName) {
-    if (throttleforCluster(rebalanceType)) {
+    if (shouldThrottleForCluster(rebalanceType)) {
       return true;
     }
-
     Long resourceThrottle;
     if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) {
       resourceThrottle = _pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType);
-      if (throttleForANYType(_pendingTransitionAllowedPerResource.get(resourceName)) || (
-          resourceThrottle != null && resourceThrottle <= 0)) {
+      if (shouldThrottleForANYType(_pendingTransitionAllowedPerResource.get(resourceName))
+          || (resourceThrottle != null && resourceThrottle <= 0)) {
         return true;
       }
     }
-
     return false;
   }
 
-  protected boolean throttleForInstance(
+  /**
+   * Check if state transitions for a particular Rebalance type must be throttled at the resource
+   * level. Assuming the "charging" already happened at this level and the throttle value did not
+   * dip below 0 at the higher level, this method purely checks whether the throttle value has
+   * reached 0 or below.
+   * @return true if it should be throttled, otherwise, false
+   */
+  protected boolean shouldThrottleForInstance(
       StateTransitionThrottleConfig.RebalanceType rebalanceType, String instanceName) {
-    if (throttleforCluster(rebalanceType)) {
+    if (shouldThrottleForCluster(rebalanceType)) {
       return true;
     }
-
     Long instanceThrottle;
     if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) {
       instanceThrottle = _pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType);
-      if (throttleForANYType(_pendingTransitionAllowedPerInstance.get(instanceName)) || (
-          instanceThrottle != null && instanceThrottle <= 0)) {
+      if (shouldThrottleForANYType(_pendingTransitionAllowedPerInstance.get(instanceName))
+          || (instanceThrottle != null && instanceThrottle <= 0)) {
         return true;
       }
     }
-
     return false;
   }
 
+  /**
+   * "Charge" for a pending state for a particular Rebalance type by subtracting one pending state
+   * from number of total pending states allowed (set by user application).
+   * @param rebalanceType
+   */
   protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType rebalanceType) {
     if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) {
       Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
@@ -159,6 +184,11 @@ class StateTransitionThrottleController {
     }
   }
 
+  /**
+   * "Charge" for a pending state for a particular Rebalance type by subtracting one pending state
+   * from number of total pending states allowed (set by user application).
+   * @param rebalanceType
+   */
   protected void chargeResource(StateTransitionThrottleConfig.RebalanceType rebalanceType,
       String resource) {
     if (_pendingTransitionAllowedPerResource.containsKey(resource)
@@ -171,6 +201,11 @@ class StateTransitionThrottleController {
     }
   }
 
+  /**
+   * "Charge" for a pending state for a particular Rebalance type by subtracting one pending state
+   * from number of total pending states allowed (set by user application).
+   * @param rebalanceType
+   */
   protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType rebalanceType,
       String instance) {
     if (_pendingTransitionAllowedPerInstance.containsKey(instance)
@@ -183,7 +218,14 @@ class StateTransitionThrottleController {
     }
   }
 
-  private boolean throttleForANYType(
+  /**
+   * Check if state transitions must be throttled overall regardless of the rebalance type.
+   * Assuming the "charging" already happened, this method purely checks whether the throttle value
+   * has reached 0 or below.
+   * @param pendingTransitionAllowed
+   * @return true if it should be throttled, otherwise, false
+   */
+  private boolean shouldThrottleForANYType(
       Map<StateTransitionThrottleConfig.RebalanceType, Long> pendingTransitionAllowed) {
     if (pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY)) {
       Long anyTypeThrottle =
@@ -195,16 +237,21 @@ class StateTransitionThrottleController {
     return false;
   }
 
+  /**
+   * "Charge" for a pending state regardless of the rebalance type by subtracting one pending state
+   * from number of total pending state from number of total pending states allowed (set by user
+   * application).
+   * @param pendingTransitionAllowed
+   */
   private void chargeANYType(
       Map<StateTransitionThrottleConfig.RebalanceType, Long> pendingTransitionAllowed) {
     if (pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY)) {
       Long anyTypeThrottle =
           pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
       if (anyTypeThrottle > 0) {
-        pendingTransitionAllowed
-            .put(StateTransitionThrottleConfig.RebalanceType.ANY, anyTypeThrottle - 1);
+        pendingTransitionAllowed.put(StateTransitionThrottleConfig.RebalanceType.ANY,
+            anyTypeThrottle - 1);
       }
     }
   }
-}
-
+}
\ No newline at end of file


Mime
View raw message