helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch wagedRebalancer updated: Refine the WAGED rebalancer to minimize the partial rebalance workload. (#639)
Date Thu, 19 Dec 2019 19:08:32 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 138d1c6  Refine the WAGED rebalancer to minimize the partial rebalance workload. (#639)
138d1c6 is described below

commit 138d1c61b3e144f1ebf4891b6eb656704952c1e1
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Thu Dec 19 11:08:24 2019 -0800

    Refine the WAGED rebalancer to minimize the partial rebalance workload. (#639)
    
    * Refine the WAGED rebalancer to minimize the partial rebalance workload.
    
    Split the cluster module calculation method so that different rebalance logic can have different rebalance scope calculation logic.
    Also, refine the WAGED rebalancer logic to reduce duplicate code.
---
 .../changedetector/ResourceChangeDetector.java     |  18 ++
 .../rebalancer/waged/WagedRebalancer.java          | 237 +++++++-------
 .../waged/model/ClusterModelProvider.java          | 342 +++++++++++++++++----
 .../stages/CurrentStateComputationStage.java       |   2 +-
 .../BestPossibleExternalViewVerifier.java          |  69 +++--
 .../rebalancer/waged/TestWagedRebalancer.java      |  19 +-
 .../waged/model/TestClusterModelProvider.java      | 176 +++++++++--
 7 files changed, 624 insertions(+), 239 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index 8402efd..14fb750 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 import org.apache.helix.HelixConstants;
@@ -173,4 +175,20 @@ public class ResourceChangeDetector implements ChangeDetector {
         changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
             determinePropertyMapByType(changeType, _newSnapshot)));
   }
+
+  /**
+   * @return A map contains all the changed items that are categorized by the change types.
+   */
+  public Map<HelixConstants.ChangeType, Set<String>> getAllChanges() {
+    return getChangeTypes().stream()
+        .collect(Collectors.toMap(changeType -> changeType, changeType -> {
+          Set<String> itemKeys = new HashSet<>();
+          itemKeys.addAll(getAdditionsByType(changeType));
+          itemKeys.addAll(getChangesByType(changeType));
+          itemKeys.addAll(getRemovalsByType(changeType));
+          return itemKeys;
+        })).entrySet().stream().filter(changeEntry -> !changeEntry.getValue().isEmpty()).collect(
+            Collectors
+                .toMap(changeEntry -> changeEntry.getKey(), changeEntry -> changeEntry.getValue()));
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 06aadca..b05287e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -94,6 +94,8 @@ public class WagedRebalancer {
   private final LatencyMetric _stateReadLatency;
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
+  // Note, the rebalance algorithm field is mutable so it should not be directly referred except for
+  // the public method computeNewIdealStates.
   private RebalanceAlgorithm _rebalanceAlgorithm;
   private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
       NOT_CONFIGURED_PREFERENCE;
@@ -242,7 +244,8 @@ public class WagedRebalancer {
     Map<String, IdealState> newIdealStates;
     try {
       // Calculate the target assignment based on the current cluster status.
-      newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput);
+      newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput,
+          _rebalanceAlgorithm);
     } catch (HelixRebalanceException ex) {
       LOG.error("Failed to calculate the new assignments.", ex);
       // Record the failure in metrics.
@@ -295,7 +298,7 @@ public class WagedRebalancer {
   // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
   private Map<String, IdealState> computeBestPossibleStates(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      final CurrentStateOutput currentStateOutput)
+      final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
     Set<String> activeNodes = DelayedRebalanceUtil
         .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
@@ -306,14 +309,15 @@ public class WagedRebalancer {
     delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
 
     Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
-        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput));
+        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput,
+            algorithm));
 
     // The additional rebalance overwrite is required since the calculated mapping may contain
     // some delayed rebalanced assignments.
     if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
       applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
-              resourceMap.keySet()));
+          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()),
+          algorithm);
     }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final mapping after calculation.
@@ -325,32 +329,17 @@ public class WagedRebalancer {
     return newIdealStates;
   }
 
-  // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
+  // Coordinate global rebalance and partial rebalance according to the cluster changes.
   protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+      RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
-    getChangeDetector().updateSnapshots(clusterData);
-    // Get all the changed items' information. Filter for the items that have content changed.
-    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
-        getChangeDetector().getChangeTypes().stream()
-            .collect(Collectors.toMap(changeType -> changeType, changeType -> {
-              Set<String> itemKeys = new HashSet<>();
-              itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
-              itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
-              itemKeys.addAll(getChangeDetector().getRemovalsByType(changeType));
-              return itemKeys;
-            })).entrySet().stream().filter(changeEntry -> !changeEntry.getValue().isEmpty())
-            .collect(Collectors
-                .toMap(changeEntry -> changeEntry.getKey(), changeEntry -> changeEntry.getValue()));
-
-    // Perform Global Baseline Calculation
-    refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
-
-    // Perform partial rebalance
+    // Perform global rebalance for a new baseline assignment
+    globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
+    // Perform partial rebalance for a new best possible assignment
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);
-
+        partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
     return newAssignment;
   }
 
@@ -388,50 +377,80 @@ public class WagedRebalancer {
     return finalIdealStateMap;
   }
 
-  // TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
-  private void refreshBaseline(ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
-      final CurrentStateOutput currentStateOutput)
+  /**
+   * Global rebalance calculates for a new baseline assignment.
+   * The new baseline assignment will be persisted and leveraged by the partial rebalance.
+   * @param clusterData
+   * @param resourceMap
+   * @param currentStateOutput
+   * @param algorithm
+   * @throws HelixRebalanceException
+   */
+  private void globalRebalance(ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput,
+      RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    _changeDetector.updateSnapshots(clusterData);
+    // Get all the changed items' information. Filter for the items that have content changed.
+    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
+        _changeDetector.getAllChanges();
+
     if (clusterChanges.keySet().stream()
         .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
-      LOG.info("Start calculating the new baseline.");
-      _globalBaselineCalcCounter.increment(1L);
-      _globalBaselineCalcLatency.startMeasuringLatency();
-
-      // For baseline calculation
+      // Build the cluster model for rebalance calculation.
+      // Note, for a Baseline calculation,
       // 1. Ignore node status (disable/offline).
-      // 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
-      // the baseline.
-      // Read the baseline from metadata store
+      // 2. Use the previous Baseline as the only parameter about the previous assignment.
       Map<String, ResourceAssignment> currentBaseline =
           getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-      Map<String, ResourceAssignment> newBaseline =
-          calculateAssignment(clusterData, clusterChanges, resourceMap,
-              clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
-
-      // Write the new baseline to metadata store
-      if (_assignmentMetadataStore != null) {
-        try {
-          _writeLatency.startMeasuringLatency();
-          _assignmentMetadataStore.persistBaseline(newBaseline);
-          _writeLatency.endMeasuringLatency();
-        } catch (Exception ex) {
-          throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
-              HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-        }
-      } else {
-        LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
+      ClusterModel clusterModel;
+      try {
+        clusterModel = ClusterModelProvider
+            .generateClusterModelForBaseline(clusterData, resourceMap,
+                clusterData.getAllInstances(), clusterChanges, currentBaseline);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
+            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
       }
-      _globalBaselineCalcLatency.endMeasuringLatency();
-      LOG.info("Finish calculating the new baseline.");
+
+      calculateAndUpdateBaseline(clusterModel, algorithm);
     }
   }
 
+  /**
+   * Calculate and update the Baseline assignment
+   * @param clusterModel
+   * @param algorithm
+   * @throws HelixRebalanceException
+   */
+  private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm)
+      throws HelixRebalanceException {
+    LOG.info("Start calculating the new baseline.");
+    _globalBaselineCalcCounter.increment(1L);
+    _globalBaselineCalcLatency.startMeasuringLatency();
+
+    Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
+    // Write the new baseline to metadata store
+    if (_assignmentMetadataStore != null) {
+      try {
+        _writeLatency.startMeasuringLatency();
+        _assignmentMetadataStore.persistBaseline(newBaseline);
+        _writeLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
+    }
+    _globalBaselineCalcLatency.endMeasuringLatency();
+    LOG.info("Finish calculating the new baseline.");
+  }
+
   private Map<String, ResourceAssignment> partialRebalance(
-      ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+      RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
     _partialRebalanceCounter.increment(1L);
@@ -442,12 +461,19 @@ public class WagedRebalancer {
         getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
 
     // Read the best possible assignment from metadata store
-    Map<String, ResourceAssignment> currentBestPossibleAssignment = getBestPossibleAssignment(
-        _assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-
-    // Compute the new assignment
-    Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
-        resourceMap, activeNodes, currentBaseline, currentBestPossibleAssignment);
+    Map<String, ResourceAssignment> currentBestPossibleAssignment =
+        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+            resourceMap.keySet());
+    ClusterModel clusterModel;
+    try {
+      clusterModel = ClusterModelProvider
+          .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
+              currentBaseline, currentBestPossibleAssignment);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+    Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterModel, algorithm);
 
     // Asynchronously report baseline divergence metric before persisting to metadata store,
     // just in case if persisting fails, we still have the metric.
@@ -464,8 +490,6 @@ public class WagedRebalancer {
     if (_assignmentMetadataStore != null) {
       try {
         _writeLatency.startMeasuringLatency();
-        // TODO Test to confirm if persisting the final assignment (with final partition states)
-        // would be a better option.
         _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
         _writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
@@ -473,7 +497,7 @@ public class WagedRebalancer {
             HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
       }
     } else {
-      LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline assignment.");
+      LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
     }
     _partialRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish calculating the new best possible assignment.");
@@ -481,45 +505,23 @@ public class WagedRebalancer {
   }
 
   /**
-   * Generate the cluster model based on the input and calculate the optimal assignment.
-   * @param clusterData the cluster data cache.
-   * @param clusterChanges the detected cluster changes.
-   * @param resourceMap the rebalancing resources.
-   * @param activeNodes the alive and enabled nodes.
-   * @param baseline the baseline assignment for the algorithm as a reference.
-   * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
-   *          reference.
+   * @param clusterModel the cluster model that contains all the cluster status for the purpose of
+   *                     rebalancing.
    * @return the new optimal assignment for the resources.
    */
-  private Map<String, ResourceAssignment> calculateAssignment(
-      ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, Map<String, ResourceAssignment> baseline,
-      Map<String, ResourceAssignment> prevBestPossibleAssignment) throws HelixRebalanceException {
+  private Map<String, ResourceAssignment> calculateAssignment(ClusterModel clusterModel,
+      RebalanceAlgorithm algorithm) throws HelixRebalanceException {
     long startTime = System.currentTimeMillis();
-    LOG.info("Start calculating for an assignment");
-    ClusterModel clusterModel;
-    try {
-      clusterModel = ClusterModelProvider.generateClusterModel(clusterData, resourceMap,
-          activeNodes, clusterChanges, baseline, prevBestPossibleAssignment);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to generate cluster model.",
-          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-    }
-
-    OptimalAssignment optimalAssignment = _rebalanceAlgorithm.calculate(clusterModel);
+    LOG.info("Start calculating for an assignment with algorithm {}",
+        algorithm.getClass().getSimpleName());
+    OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
     Map<String, ResourceAssignment> newAssignment =
         optimalAssignment.getOptimalResourceAssignment();
-
-    LOG.info("Finish calculating an assignment. Took: {} ms.", System.currentTimeMillis() - startTime);
-
+    LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.",
+        algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
     return newAssignment;
   }
 
-  private ResourceChangeDetector getChangeDetector() {
-    return _changeDetector;
-  }
-
   // Generate the preference lists from the state mapping based on state priority.
   private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
       Map<String, Integer> statePriorityMap) {
@@ -635,10 +637,11 @@ public class WagedRebalancer {
       Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
       offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
       for (String resource : resourceSet) {
-        DelayedRebalanceUtil.setRebalanceScheduler(resource, delayedRebalanceEnabled,
-            offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
-            clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
-            clusterConfig.getRebalanceDelayTime(), clusterConfig, _manager);
+        DelayedRebalanceUtil
+            .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
+                clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+                clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
+                clusterConfig, _manager);
       }
     } else {
       LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
@@ -653,17 +656,26 @@ public class WagedRebalancer {
    * @param idealStateMap the calculated ideal states.
    * @param clusterData the cluster data cache.
    * @param resourceMap the rebalanaced resource map.
-   * @param baseline the baseline assignment
+   * @param baseline the baseline assignment.
+   * @param algorithm the rebalance algorithm.
    */
   private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
-    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
-    // Note that the calculation used the baseline as the input only. This is for minimizing
-    // unnecessary partition movement.
-    Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData,
-        calculateAssignment(clusterData, Collections.emptyMap(), resourceMap, enabledLiveInstances,
-            Collections.emptyMap(), baseline));
+      Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
+      throws HelixRebalanceException {
+    ClusterModel clusterModel;
+    try {
+      // Note this calculation uses the baseline as the best possible assignment input here.
+      // This is for minimizing unnecessary partition movement.
+      clusterModel = ClusterModelProvider
+          .generateClusterModelFromExistingAssignment(clusterData, resourceMap, baseline);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException(
+          "Failed to generate cluster model for delayed rebalance overwrite.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+    Map<String, IdealState> activeIdealStates =
+        convertResourceAssignment(clusterData, calculateAssignment(clusterModel, algorithm));
     for (String resourceName : idealStateMap.keySet()) {
       // The new calculated ideal state before overwrite
       IdealState newIdealState = idealStateMap.get(resourceName);
@@ -676,6 +688,7 @@ public class WagedRebalancer {
       IdealState newActiveIdealState = activeIdealStates.get(resourceName);
       // The current ideal state that exists in the IdealState znode
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
+      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
       int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica =
           DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index dc36fba..bfb8750 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -19,11 +19,13 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -43,7 +45,19 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public class ClusterModelProvider {
 
+  private enum RebalanceScopeType {
+    // Set the rebalance scope to cover the difference between the current assignment and the
+    // Baseline assignment only.
+    PARTIAL,
+    // Set the rebalance scope to cover all replicas that need relocation based on the cluster
+    // changes.
+    GLOBAL_BASELINE
+  }
+
   /**
+   * Generate a new Cluster Model object according to the current cluster status for partial
+   * rebalance. The rebalance scope is configured for recovering the missing replicas that are in
+   * the Baseline assignment but not in the current Best possible assignment only.
    * @param dataProvider           The controller's data cache.
    * @param resourceMap            The full list of the resources to be rebalanced. Note that any
    *                               resources that are not in this list will be removed from the
@@ -51,17 +65,77 @@ public class ClusterModelProvider {
    * @param activeInstances        The active instances that will be used in the calculation.
    *                               Note this list can be different from the real active node list
    *                               according to the rebalancer logic.
-   * @param clusterChanges         All the cluster changes that happened after the previous rebalance.
    * @param baselineAssignment     The persisted Baseline assignment.
    * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
    *                               previous rebalance.
-   * @return Generate a new Cluster Model object according to the current cluster status.
+   * @return
    */
-  public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
+  public static ClusterModel generateClusterModelForPartialRebalance(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+      Set<String> activeInstances, Map<String, ResourceAssignment> baselineAssignment,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, activeInstances, Collections.emptyMap(),
+        baselineAssignment, bestPossibleAssignment, RebalanceScopeType.PARTIAL);
+  }
+
+  /**
+   * Generate a new Cluster Model object according to the current cluster status for the Baseline
+   * calculation. The rebalance scope is determined according to the cluster changes.
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
+   *                               resources that are not in this list will be removed from the
+   *                               final assignment.
+   * @param allInstances           All the instances that will be used in the calculation.
+   * @param clusterChanges         All the cluster changes that happened after the previous rebalance.
+   * @param baselineAssignment     The previous Baseline assignment.
+   * @return the new cluster model
+   */
+  public static ClusterModel generateClusterModelForBaseline(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+      Set<String> allInstances, Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
+      Map<String, ResourceAssignment> baselineAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, allInstances, clusterChanges,
+        Collections.emptyMap(), baselineAssignment, RebalanceScopeType.GLOBAL_BASELINE);
+  }
+
+  /**
+   * Generate a cluster model based on the current state output and data cache. The rebalance scope
+   * is configured for recovering the missing replicas only.
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
+   *                               resources that are not in this list will be removed from the
+   *                               final assignment.
+   * @param currentStateAssignment The resource assignment built from current state output.
+   * @return the new cluster model
+   */
+  public static ClusterModel generateClusterModelFromExistingAssignment(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+      Map<String, ResourceAssignment> currentStateAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, dataProvider.getEnabledLiveInstances(),
+        Collections.emptyMap(), Collections.emptyMap(), currentStateAssignment,
+        RebalanceScopeType.GLOBAL_BASELINE);
+  }
+
+  /**
+   * Generate a new Cluster Model object according to the current cluster status.
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
+   *                               resources that are not in this list will be removed from the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in the calculation.
+   *                               Note this list can be different from the real active node list
+   *                               according to the rebalancer logic.
+   * @param clusterChanges         All the cluster changes that happened after the previous rebalance.
+   * @param idealAssignment        The ideal assignment.
+   * @param currentAssignment      The current assignment that was generated in the previous rebalance.
+   * @param scopeType              Specify how to determine the rebalance scope.
+   * @return the new cluster model
+   */
+  private static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
       Map<String, Resource> resourceMap, Set<String> activeInstances,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
-      Map<String, ResourceAssignment> baselineAssignment,
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
+      Map<String, ResourceAssignment> idealAssignment,
+      Map<String, ResourceAssignment> currentAssignment, RebalanceScopeType scopeType) {
     // Construct all the assignable nodes and initialize with the allocated replicas.
     Set<AssignableNode> assignableNodes =
         parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
@@ -75,9 +149,25 @@ public class ClusterModelProvider {
     // Check if the replicas need to be reassigned.
     Map<String, Set<AssignableReplica>> allocatedReplicas =
         new HashMap<>(); // <instanceName, replica set>
-    Set<AssignableReplica> toBeAssignedReplicas =
-        findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
-            dataProvider.getLiveInstances().keySet(), bestPossibleAssignment, allocatedReplicas);
+    Set<AssignableReplica> toBeAssignedReplicas;
+    switch (scopeType) {
+      case GLOBAL_BASELINE:
+        toBeAssignedReplicas = findToBeAssignedReplicasByClusterChanges(replicaMap, activeInstances,
+            dataProvider.getLiveInstances().keySet(), clusterChanges, currentAssignment,
+            allocatedReplicas);
+        break;
+      case PARTIAL:
+        // Filter to remove the replicas that do not exist in the ideal assignment given but exist
+        // in the replicaMap. This is because such replicas are new additions that do not need to be
+        // rebalanced right away.
+        retainExistingReplicas(replicaMap, idealAssignment);
+        toBeAssignedReplicas =
+            findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeInstances,
+                idealAssignment, currentAssignment, allocatedReplicas);
+        break;
+      default:
+        throw new HelixException("Unknown rebalance scope type: " + scopeType);
+    }
 
     // Update the allocated replicas to the assignable nodes.
     assignableNodes.parallelStream().forEach(node -> node.assignInitBatch(
@@ -86,49 +176,156 @@ public class ClusterModelProvider {
     // Construct and initialize cluster context.
     ClusterContext context = new ClusterContext(
         replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
-        assignableNodes, baselineAssignment, bestPossibleAssignment);
+        assignableNodes, idealAssignment, currentAssignment);
     // Initial the cluster context with the allocated assignments.
     context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
 
     return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
   }
 
+  // Filter the replicas map so only the replicas that have been allocated in the existing
+  // assignmentMap remain in the map.
+  private static void retainExistingReplicas(Map<String, Set<AssignableReplica>> replicaMap,
+      Map<String, ResourceAssignment> assignmentMap) {
+    replicaMap.entrySet().parallelStream().forEach(replicaSetEntry -> {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          getStateInstanceMap(assignmentMap.get(replicaSetEntry.getKey()));
+      // Iterate the replicas of the resource to find the ones that require reallocating.
+      Iterator<AssignableReplica> replicaIter = replicaSetEntry.getValue().iterator();
+      while (replicaIter.hasNext()) {
+        AssignableReplica replica = replicaIter.next();
+        Set<String> validInstances =
+            stateInstanceMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap())
+                .getOrDefault(replica.getReplicaState(), Collections.emptySet());
+        if (validInstances.isEmpty()) {
+          // Removing the replica if it is not known in the assignment map.
+          replicaIter.remove();
+        } else {
+          // Remove the instance from the state map record after processing so it won't be
+          // double-processed as we loop through all replica
+          validInstances.remove(validInstances.iterator().next());
+        }
+      }
+    });
+  }
+
   /**
-   * Generate a cluster model based on the current state output and data cache.
-   * @param dataProvider           The controller's data cache.
-   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
-   *                               resources that are not in this list will be removed from the
-   *                               final assignment.
-   * @param currentStateAssignment The resource assignment built from current state output.
-   * @return A cluster model based on the current state and data cache.
+   * Find the minimum set of replicas that need to be reassigned by comparing the current assignment
+   * with the ideal assignment.
+   * A replica needs to be reassigned or newly assigned if either of the following conditions is true:
+   * 1. The partition allocation (the instance the replica is placed on) in the ideal assignment and
+   * the current assignment are different. And the allocation in the ideal assignment is valid.
+   * So it is worthwhile to move it.
+   * 2. The partition allocation is in neither the ideal assignment nor the current assignment. Or
+   * those allocations are not valid due to offline or disabled instances.
+   * Otherwise, the rebalancer just keeps the current assignment allocation.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by resource name.
+   * @param activeInstances        All the instances that are live and enabled according to the delay rebalance configuration.
+   * @param idealAssignment        The ideal assignment.
+   * @param currentAssignment      The current assignment that was generated in the previous rebalance.
+   * @param allocatedReplicas      A map of <Instance -> replicas> to return the allocated replicas grouped by the target instance name.
+   * @return The replicas that need to be reassigned.
    */
-  public static ClusterModel generateClusterModelFromCurrentState(
-      ResourceControllerDataProvider dataProvider,
-      Map<String, Resource> resourceMap,
-      Map<String, ResourceAssignment> currentStateAssignment) {
-    return generateClusterModel(dataProvider, resourceMap, dataProvider.getEnabledLiveInstances(),
-        Collections.emptyMap(), Collections.emptyMap(), currentStateAssignment);
+  private static Set<AssignableReplica> findToBeAssignedReplicasByComparingWithIdealAssignment(
+      Map<String, Set<AssignableReplica>> replicaMap, Set<String> activeInstances,
+      Map<String, ResourceAssignment> idealAssignment,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    // check each resource to identify the allocated replicas and to-be-assigned replicas.
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> idealPartitionStateMap =
+          getValidStateInstanceMap(idealAssignment.get(resourceName), activeInstances);
+      Map<String, Map<String, Set<String>>> currentPartitionStateMap =
+          getValidStateInstanceMap(currentAssignment.get(resourceName), activeInstances);
+      // Iterate the replicas of the resource to find the ones that require reallocating.
+      for (AssignableReplica replica : replicaMap.get(resourceName)) {
+        String partitionName = replica.getPartitionName();
+        String replicaState = replica.getReplicaState();
+        Set<String> idealAllocations =
+            idealPartitionStateMap.getOrDefault(partitionName, Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+        Set<String> currentAllocations =
+            currentPartitionStateMap.getOrDefault(partitionName, Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+
+        // Compare the current assignments with the ideal assignment for the common part.
+        List<String> commonAllocations = new ArrayList<>(currentAllocations);
+        commonAllocations.retainAll(idealAllocations);
+        if (!commonAllocations.isEmpty()) {
+          // 1. If the partition is allocated at the same location in both ideal and current
+          // assignments, there is no need to reassign it.
+          String allocatedInstance = commonAllocations.get(0);
+          allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new HashSet<>()).add(replica);
+          // Remove the instance from the record to prevent this instance from being processed twice.
+          idealAllocations.remove(allocatedInstance);
+          currentAllocations.remove(allocatedInstance);
+        } else if (!idealAllocations.isEmpty()) {
+          // 2. If the partition is allocated at an active instance in the ideal assignment but the
+          // same allocation does not exist in the current assignment, try to rebalance the replica
+          // or assign it if the replica has not been assigned.
+          // There are two possible conditions,
+          // * This replica has been newly added and has not been assigned yet, so it appears in
+          // the ideal assignment and does not appear in the current assignment.
+          // * The allocation of this replica in the ideal assignment has been updated due to a
+          // cluster change. For example, new instance is added. So the old allocation in the
+          // current assignment might be sub-optimal.
+          // In either condition, we add it to toBeAssignedReplicas so that it will get assigned.
+          toBeAssignedReplicas.add(replica);
+          // Remove the pending allocation from the idealAllocations after processing so that the
+          // instance won't be double-processed as we loop through all replicas
+          String pendingAllocation = idealAllocations.iterator().next();
+          idealAllocations.remove(pendingAllocation);
+        } else if (!currentAllocations.isEmpty()) {
+          // 3. This replica exists in the current assignment but does not appear or has a valid
+          // allocation in the ideal assignment.
+          // This means either 1) that the ideal assignment actually has this replica allocated on
+          // this instance, but it does not show up because the instance is temporarily offline or
+          // disabled (note that all such instances have been filtered out in earlier part of the
+          // logic) or that the most recent version of the ideal assignment was not fetched
+          // correctly from the assignment metadata store.
+          // In either case, the solution is to keep the current assignment. So put this replica
+          // with the allocated instance into the allocatedReplicas map.
+          String allocatedInstance = currentAllocations.iterator().next();
+          allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new HashSet<>()).add(replica);
+          // Remove the instance from the record to prevent the same location being processed again.
+          currentAllocations.remove(allocatedInstance);
+        } else {
+          // 4. This replica is not found in either the ideal assignment or the current assignment
+          // with a valid allocation. This implies that the replica was newly added but was never
+          // assigned in reality or was added so recently that it hasn't shown up in the ideal
+          // assignment (because it's calculation takes longer and is asynchronously calculated).
+          // In that case, we add it to toBeAssignedReplicas so that it will get assigned as a
+          // result of partialRebalance.
+          toBeAssignedReplicas.add(replica);
+        }
+      }
+    }
+    return toBeAssignedReplicas;
   }
 
   /**
-   * Find the minimum set of replicas that need to be reassigned.
+   * Find the minimum set of replicas that need to be reassigned according to the cluster change.
    * A replica needs to be reassigned if one of the following condition is true:
    * 1. Cluster topology (the cluster config / any instance config) has been updated.
    * 2. The resource config has been updated.
-   * 3. If the current best possible assignment does not contain the partition's valid assignment.
+   * 3. If the current assignment does not contain the partition's valid assignment.
    *
    * @param replicaMap             A map contains all the replicas grouped by resource name.
-   * @param clusterChanges         A map contains all the important metadata updates that happened after the previous rebalance.
    * @param activeInstances        All the instances that are live and enabled according to the delay rebalance configuration.
    * @param liveInstances          All the instances that are live.
-   * @param bestPossibleAssignment The current best possible assignment.
+   * @param clusterChanges         A map that contains all the important metadata updates that happened after the previous rebalance.
+   * @param currentAssignment      The current replica assignment.
    * @param allocatedReplicas      Return the allocated replicas grouped by the target instance name.
    * @return The replicas that need to be reassigned.
    */
-  private static Set<AssignableReplica> findToBeAssignedReplicas(
-      Map<String, Set<AssignableReplica>> replicaMap,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
-      Set<String> liveInstances, Map<String, ResourceAssignment> bestPossibleAssignment,
+  private static Set<AssignableReplica> findToBeAssignedReplicasByClusterChanges(
+      Map<String, Set<AssignableReplica>> replicaMap, Set<String> activeInstances,
+      Set<String> liveInstances, Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
+      Map<String, ResourceAssignment> currentAssignment,
       Map<String, Set<AssignableReplica>> allocatedReplicas) {
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
@@ -151,42 +348,39 @@ public class ClusterModelProvider {
           .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
     } else {
       // check each resource to identify the allocated replicas and to-be-assigned replicas.
-      for (String resourceName : replicaMap.keySet()) {
-        Set<AssignableReplica> replicas = replicaMap.get(resourceName);
+      for (Map.Entry<String, Set<AssignableReplica>> replicaMapEntry : replicaMap.entrySet()) {
+        String resourceName = replicaMapEntry.getKey();
+        Set<AssignableReplica> replicas = replicaMapEntry.getValue();
         // 1. if the resource config/idealstate is changed, need to reassign.
-        // 2. if the resource does appear in the best possible assignment, need to reassign.
+        // 2. if the resource does not appear in the current assignment, need to reassign.
         if (clusterChanges
             .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet())
             .contains(resourceName) || clusterChanges
             .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
-            .contains(resourceName) || !bestPossibleAssignment.containsKey(resourceName)) {
+            .contains(resourceName) || !currentAssignment.containsKey(resourceName)) {
           toBeAssignedReplicas.addAll(replicas);
           continue; // go to check next resource
         } else {
-          // check for every best possible assignments to identify if the related replicas need to reassign.
-          ResourceAssignment assignment = bestPossibleAssignment.get(resourceName);
-          // <partition, <instance, state>>
-          Map<String, Map<String, String>> stateMap = assignment.getMappedPartitions().stream()
-              .collect(Collectors.toMap(partition -> partition.getPartitionName(),
-                  partition -> new HashMap<>(assignment.getReplicaMap(partition))));
+          // check for every replica assignment to identify if the related replicas need to be reassigned.
+          // <partition, <state, instances list>>
+          Map<String, Map<String, Set<String>>> stateMap =
+              getValidStateInstanceMap(currentAssignment.get(resourceName), activeInstances);
           for (AssignableReplica replica : replicas) {
             // Find any ACTIVE instance allocation that has the same state with the replica
-            Optional<Map.Entry<String, String>> instanceNameOptional =
-                stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap()).entrySet()
-                    .stream().filter(instanceStateMap ->
-                    instanceStateMap.getValue().equals(replica.getReplicaState()) && activeInstances
-                        .contains(instanceStateMap.getKey())).findAny();
-            // 3. if no such an instance in the bestPossible assignment, need to reassign the replica
-            if (!instanceNameOptional.isPresent()) {
+            Set<String> validInstances =
+                stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap())
+                    .getOrDefault(replica.getReplicaState(), Collections.emptySet());
+            if (validInstances.isEmpty()) {
+              // 3. if no such an instance in the current assignment, need to reassign the replica
               toBeAssignedReplicas.add(replica);
               continue; // go to check the next replica
             } else {
-              String instanceName = instanceNameOptional.get().getKey();
-              // * cleanup the best possible state map record,
-              // * so the selected instance won't be picked up again for the another replica check
-              stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap())
-                  .remove(instanceName);
-              // the current best possible assignment for this replica is valid,
+              Iterator<String> iter = validInstances.iterator();
+              // Remove the instance from the current allocation record after processing so that it
+              // won't be double-processed as we loop through all replicas
+              String instanceName = iter.next();
+              iter.remove();
+              // the current assignment for this replica is valid,
               // add to the allocated replica list.
               allocatedReplicas.computeIfAbsent(instanceName, key -> new HashSet<>()).add(replica);
             }
@@ -198,19 +392,51 @@ public class ClusterModelProvider {
   }
 
   /**
+   * Filter to remove all invalid allocations that are not on the active instances.
+   * @param assignment
+   * @param activeInstances
+   * @return A map of <partition, <state, instances set>> contains the valid state to instance map.
+   */
+  private static Map<String, Map<String, Set<String>>> getValidStateInstanceMap(
+      ResourceAssignment assignment, Set<String> activeInstances) {
+    Map<String, Map<String, Set<String>>> stateInstanceMap = getStateInstanceMap(assignment);
+    stateInstanceMap.values().stream().forEach(stateMap -> stateMap.values().stream()
+        .forEach(instanceSet -> instanceSet.retainAll(activeInstances)));
+    return stateInstanceMap;
+  }
+
+  // <partition, <state, instances set>>
+  private static Map<String, Map<String, Set<String>>> getStateInstanceMap(
+      ResourceAssignment assignment) {
+    if (assignment == null) {
+      return Collections.emptyMap();
+    }
+    return assignment.getMappedPartitions().stream()
+        .collect(Collectors.toMap(partition -> partition.getPartitionName(), partition -> {
+          Map<String, Set<String>> stateInstanceMap = new HashMap<>();
+          assignment.getReplicaMap(partition).entrySet().stream().forEach(
+              stateMapEntry -> stateInstanceMap
+                  .computeIfAbsent(stateMapEntry.getValue(), key -> new HashSet<>())
+                  .add(stateMapEntry.getKey()));
+          return stateInstanceMap;
+        }));
+  }
+
+  /**
    * Parse all the nodes that can be assigned replicas based on the configurations.
    *
    * @param clusterConfig     The cluster configuration.
    * @param instanceConfigMap A map of all the instance configuration.
+   *                          If any active instance has no configuration, it will be ignored.
    * @param activeInstances   All the instances that are online and enabled.
    * @return A map of assignable node set, <InstanceName, node set>.
    */
   private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
       Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) {
-    return activeInstances.parallelStream().map(
-        instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
-            instanceName))
-        .collect(Collectors.toSet());
+    return activeInstances.parallelStream()
+        .filter(instance -> instanceConfigMap.containsKey(instance)).map(
+            instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
+                instanceName)).collect(Collectors.toSet());
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 890fa57..9ad8fcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -254,7 +254,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
 
         Map<String, ResourceAssignment> currentStateAssignment =
             currentStateOutput.getAssignment(resourceToMonitorMap.keySet());
-        ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromCurrentState(
+        ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
             dataProvider, resourceToMonitorMap, currentStateAssignment);
 
         Map<String, Double> maxUsageMap = new HashMap<>();
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 4118ccf..74554e9 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -37,6 +37,7 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
 import org.apache.helix.controller.stages.AttributeName;
@@ -413,45 +414,45 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
   }
-}
 
-/**
- * A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status but
- * never update the rebalancer assignment metadata.
- * This rebalacer is used in the verifiers or tests.
- */
-class DryrunWagedRebalancer extends WagedRebalancer {
-  DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-    super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
-        ConstraintBasedAlgorithmFactory.getInstance(preferences));
-  }
+  /**
+   * A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status but
+   * never update the rebalancer assignment metadata.
+   * This rebalacer is used in the verifiers or tests.
+   */
+  private class DryrunWagedRebalancer extends WagedRebalancer {
+    DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
+        Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+      super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
+          ConstraintBasedAlgorithmFactory.getInstance(preferences));
+    }
 
-  @Override
-  protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
-      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, CurrentStateOutput currentStateOutput)
-      throws HelixRebalanceException {
-    return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
-        resourceMap.keySet());
+    @Override
+    protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+        ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+        Set<String> activeNodes, CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
+        throws HelixRebalanceException {
+      return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
+          resourceMap.keySet());
+    }
   }
-}
 
-class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
-  ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
-    super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
-  }
+  private class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
+    ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
+      super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
+    }
 
-  @Override
-  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-    // Update the in-memory reference only
-    _globalBaseline = globalBaseline;
-  }
+    @Override
+    public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+      // Update the in-memory reference only
+      _globalBaseline = globalBaseline;
+    }
 
-  @Override
-  public void persistBestPossibleAssignment(
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
-    // Update the in-memory reference only
-    _bestPossibleAssignment = bestPossibleAssignment;
+    @Override
+    public void persistBestPossibleAssignment(
+        Map<String, ResourceAssignment> bestPossibleAssignment) {
+      // Update the in-memory reference only
+      _bestPossibleAssignment = bestPossibleAssignment;
+    }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 2250539..2b172e8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -114,8 +114,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test
-  public void testRebalance()
-      throws IOException, HelixRebalanceException {
+  public void testRebalance() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
@@ -128,6 +127,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
               .forEach(partition -> resource.addPartition(partition));
           return resource;
         }));
+    // Mocking the change types for triggering a baseline rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+
     Map<String, IdealState> newIdealStates =
         rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
     Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
@@ -150,6 +153,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
               .forEach(partition -> resource.addPartition(partition));
           return resource;
         }));
+    // Mocking the change types for triggering a baseline rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
 
     // Test with partial resources listed in the resourceMap input.
     // Remove the first resource from the input. Note it still exists in the cluster data cache.
@@ -175,6 +181,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
               .forEach(partition -> resource.addPartition(partition));
           return resource;
         }));
+    // Mocking the change types for triggering a baseline rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
 
     // Test with current state exists, so the rebalancer should calculate for the intermediate state
     // Create current state based on the cluster data cache.
@@ -256,12 +265,12 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
-          clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
+          clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to generate cluster model. Failure Type: INVALID_CLUSTER_STATUS");
+          "Failed to generate cluster model for partial rebalance. Failure Type: INVALID_CLUSTER_STATUS");
     }
 
     // The rebalance will be done with empty mapping result since there is no previously calculated
@@ -321,7 +330,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Calculation will fail
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
-          clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
+          clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index ad608b6..7a3ff22 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -98,10 +98,10 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
   public void testGenerateClusterModel() throws IOException {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     // 1. test generating a cluster model with empty assignment
-    ClusterModel clusterModel = ClusterModelProvider.generateClusterModel(testCache,
+    ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
         _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        _instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+        _instances, Collections.emptyMap(), Collections.emptyMap());
     // There should be no existing assignment.
     Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .anyMatch(resourceMap -> !resourceMap.isEmpty()));
@@ -119,19 +119,20 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     // Adjust instance fault zone, so they have different fault zones.
     testCache.getInstanceConfigMap().values().stream()
         .forEach(config -> config.setZoneId(config.getInstanceName()));
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        _instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+        _instances, Collections.emptyMap(), Collections.emptyMap());
     // Shall have 2 resources and 12 replicas after fault zone adjusted.
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
         .allMatch(replicaSet -> replicaSet.size() == 12));
 
     // 2. test with only one active node
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        Collections.singleton(_testInstanceId), Collections.emptyMap(), Collections.emptyMap(),
-        Collections.emptyMap());
+        Collections.singleton(_testInstanceId), Collections.emptyMap(), Collections.emptyMap());
     // Have only one instance
     Assert.assertEquals(
         clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
@@ -141,19 +142,19 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
         .allMatch(replicaSet -> replicaSet.size() == 4));
 
     // 3. test with no active instance
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
-        Collections.emptyMap());
+        Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap());
     // Have only one instance
     Assert.assertEquals(clusterModel.getAssignableNodes().size(), 0);
-    // Shall have 0 assignable replicas because there is only n0 valid node.
+    // Shall have 0 assignable replicas because there are 0 valid nodes.
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
         .allMatch(replicaSet -> replicaSet.isEmpty()));
 
-    // 4. test with best possible assignment
-    // Mock a best possible assignment based on the current states.
-    Map<String, ResourceAssignment> bestPossibleAssignment = new HashMap<>();
+    // 4. test with baseline assignment
+    // Mock a baseline assignment based on the current states.
+    Map<String, ResourceAssignment> baselineAssignment = new HashMap<>();
     for (String resource : _resourceNames) {
       // <partition, <instance, state>>
       Map<String, Map<String, String>> assignmentMap = new HashMap<>();
@@ -166,14 +167,15 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
         ResourceAssignment assignment = new ResourceAssignment(resource);
         assignmentMap.keySet().stream().forEach(partition -> assignment
             .addReplicaMap(new Partition(partition), assignmentMap.get(partition)));
-        bestPossibleAssignment.put(resource, assignment);
+        baselineAssignment.put(resource, assignment);
       }
     }
 
     // Generate a cluster model based on the best possible assignment
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        _instances, Collections.emptyMap(), Collections.emptyMap(), bestPossibleAssignment);
+        _instances, Collections.emptyMap(), baselineAssignment);
     // There should be 4 existing assignments in total (each resource has 2) in the specified instance
     Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.values().stream()
@@ -186,10 +188,12 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
         .allMatch(replicaSet -> replicaSet.size() == 10));
 
     // 5. test with best possible assignment but cluster topology is changed
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        _instances, Collections.singletonMap(HelixConstants.ChangeType.CLUSTER_CONFIG,
-            Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment);
+        _instances,
+        Collections.singletonMap(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet()),
+        baselineAssignment);
     // There should be no existing assignment since the topology change invalidates all existing assignment
     Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.isEmpty()));
@@ -203,11 +207,11 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     // 6. test with best possible assignment and one resource config change
     // Generate a cluster model based on the same best possible assignment, but resource1 config is changed
     String changedResourceName = _resourceNames.get(0);
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
         _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
-            Collections.singleton(changedResourceName)), Collections.emptyMap(),
-        bestPossibleAssignment);
+            Collections.singleton(changedResourceName)), baselineAssignment);
     // There should be no existing assignment for all the resource except for resource2
     Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(), 1);
     Map<String, Set<String>> resourceAssignmentMap =
@@ -221,9 +225,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     }
     // Only the first instance will have 2 assignment from resource2.
     for (String instance : _instances) {
-      Assert
-          .assertEquals(clusterModel.getAssignableNodes().get(instance).getAssignedReplicaCount(),
-              instance.equals(_testInstanceId) ? 2 : 0);
+      Assert.assertEquals(clusterModel.getAssignableNodes().get(instance).getAssignedReplicaCount(),
+          instance.equals(_testInstanceId) ? 2 : 0);
     }
     // Shall have 2 resources and 12 replicas
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().keySet().size(), 2);
@@ -236,10 +239,10 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     // Generate a cluster model based on the best possible assignment, but the assigned node is disabled
     Set<String> limitedActiveInstances = new HashSet<>(_instances);
     limitedActiveInstances.remove(_testInstanceId);
-    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+    clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
+        _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        limitedActiveInstances, Collections.emptyMap(), Collections.emptyMap(),
-        bestPossibleAssignment);
+        limitedActiveInstances, Collections.emptyMap(), baselineAssignment);
     // There should be no existing assignment.
     Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .anyMatch(resourceMap -> !resourceMap.isEmpty()));
@@ -253,6 +256,121 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
         .allMatch(replicaSet -> replicaSet.size() == 8));
+  }
 
+  @Test (dependsOnMethods = "testGenerateClusterModel")
+  public void testGenerateClusterModelForPartialRebalance() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    // 1. test generating a cluster model with empty assignment
+    ClusterModel clusterModel = ClusterModelProvider
+        .generateClusterModelForPartialRebalance(testCache, _resourceNames.stream()
+                .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+            _instances, Collections.emptyMap(), Collections.emptyMap());
+    // There should be no existing assignment.
+    Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .anyMatch(resourceMap -> !resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
+    // Have all 3 instances
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), _instances);
+    // Shall have 0 resources and 0 replicas since the baseline is empty. The partial rebalance
+    // should not rebalance any replica.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
+
+    // Adjust instance fault zone, so they have different fault zones.
+    testCache.getInstanceConfigMap().values().stream()
+        .forEach(config -> config.setZoneId(config.getInstanceName()));
+
+    // 2. test with a pair of identical best possible assignment and baseline assignment
+    // Mock a best possible assignment based on the current states.
+    Map<String, ResourceAssignment> bestPossibleAssignment = new HashMap<>();
+    for (String resource : _resourceNames) {
+      // <partition, <instance, state>>
+      Map<String, Map<String, String>> assignmentMap = new HashMap<>();
+      CurrentState cs = testCache.getCurrentState(_testInstanceId, _sessionId).get(resource);
+      if (cs != null) {
+        for (Map.Entry<String, String> stateEntry : cs.getPartitionStateMap().entrySet()) {
+          assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new HashMap<>())
+              .put(_testInstanceId, stateEntry.getValue());
+        }
+        ResourceAssignment assignment = new ResourceAssignment(resource);
+        assignmentMap.keySet().stream().forEach(partition -> assignment
+            .addReplicaMap(new Partition(partition), assignmentMap.get(partition)));
+        bestPossibleAssignment.put(resource, assignment);
+      }
+    }
+    Map<String, ResourceAssignment> baseline = new HashMap<>(bestPossibleAssignment);
+    // Generate a cluster model for partial rebalance
+    clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
+        _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, baseline, bestPossibleAssignment);
+    // There should be 4 existing assignments in total (each resource has 2) in the specified instance
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().stream()
+            .allMatch(partitionSet -> partitionSet.size() == 2)));
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 4);
+    // Since the best possible matches the baseline, no replica needs to be reassigned.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
+
+    // 3. test with inactive instance in the baseline and the best possible assignment
+    Set<String> partialInstanceList = new HashSet<>(_instances);
+    partialInstanceList.remove(_testInstanceId);
+    clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
+        _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        partialInstanceList, baseline, bestPossibleAssignment);
+    // Have the other 2 active instances
+    Assert.assertEquals(clusterModel.getAssignableNodes().size(), 2);
+    // All the replicas in the existing assignment should be rebalanced.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 2));
+    // Shall have 0 assigned replicas
+    Assert.assertTrue(clusterModel.getAssignableNodes().values().stream()
+        .allMatch(assignableNode -> assignableNode.getAssignedReplicaCount() == 0));
+
+    // 4. test with one resource that is only in the baseline
+    String resourceInBaselineOnly = _resourceNames.get(0);
+    Map<String, ResourceAssignment> partialBestPossibleAssignment =
+        new HashMap<>(bestPossibleAssignment);
+    partialBestPossibleAssignment.remove(resourceInBaselineOnly);
+    // Generate a cluster mode with the adjusted best possible assignment
+    clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
+        _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, baseline, partialBestPossibleAssignment);
+    // There should be 2 existing assignments in total in the specified instance
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().stream()
+            .allMatch(partitionSet -> partitionSet.size() == 2)));
+    // Only the replicas of one resource require rebalance
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 2);
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 1);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().containsKey(resourceInBaselineOnly));
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 2));
+
+    // 5. test with one resource only in the best possible assignment
+    String resourceInBestPossibleOnly = _resourceNames.get(1);
+    Map<String, ResourceAssignment> partialBaseline = new HashMap<>(baseline);
+    partialBaseline.remove(resourceInBestPossibleOnly);
+    // Generate a cluster model with the adjusted baseline
+    clusterModel = ClusterModelProvider.generateClusterModelForPartialRebalance(testCache,
+        _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, partialBaseline, bestPossibleAssignment);
+    // There should be 2 existing assignments in total and all of them require rebalance.
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().stream()
+            .allMatch(partitionSet -> partitionSet.size() == 2)));
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(), 2);
+    // No need to rebalance the replicas that are not in the baseline yet.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
   }
 }


Mime
View raw message