helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch wagedRebalancer updated: Improve the algorithm so it prioritizes the assignment to the idle nodes when the constraint evaluation results are the same (#651)
Date Thu, 12 Dec 2019 00:02:43 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new b0ba6e3  Improve the algorithm so it prioritizes the assignment to the idle nodes
when the constraint evaluation results are the same (#651)
b0ba6e3 is described below

commit b0ba6e3fa1908ad60a3d27958975866b4764fcb9
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Wed Dec 11 16:02:34 2019 -0800

    Improve the algorithm so it prioritizes the assignment to the idle nodes when the constraint
evaluation results are the same (#651)
    
    This is to get rid of the randomness when the algorithm result is a tie. Usually, when
the algorithm picks up the nodes with the same score, more partition movements will be triggered
on a cluster change.
---
 .../constraints/ConstraintBasedAlgorithm.java      | 41 +++++++++++++++++-----
 1 file changed, 33 insertions(+), 8 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 0956341..dcadff6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -20,7 +20,7 @@ package org.apache.helix.controller.rebalancer.waged.constraints;
  */
 
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,18 +63,22 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
   }
 
   @Override
-  public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebalanceException
{
+  public OptimalAssignment calculate(ClusterModel clusterModel)
+      throws HelixRebalanceException {
     OptimalAssignment optimalAssignment = new OptimalAssignment();
     List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
+    Set<String> busyInstances =
+        getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
     // Sort the replicas so the input is stable for the greedy algorithm.
     // For the other algorithm implementation, this sorting could be unnecessary.
     for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
       Optional<AssignableNode> maybeBestNode =
-          getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment);
+          getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances,
+              optimalAssignment);
       // stop immediately if any replica cannot find best assignable node
       if (optimalAssignment.hasAnyFailure()) {
-        String errorMessage = String.format(
-            "Unable to find any available candidate node for partition %s; Fail reasons:
%s",
+        String errorMessage = String
+            .format("Unable to find any available candidate node for partition %s; Fail reasons:
%s",
             replica.getPartitionName(), optimalAssignment.getFailures());
         throw new HelixRebalanceException(errorMessage,
             HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -89,7 +93,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
 
   private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica replica,
       List<AssignableNode> assignableNodes, ClusterContext clusterContext,
-      OptimalAssignment optimalAssignment) {
+      Set<String> busyInstances, OptimalAssignment optimalAssignment) {
     Map<AssignableNode, List<HardConstraint>> hardConstraintFailures = new ConcurrentHashMap<>();
     List<AssignableNode> candidateNodes = assignableNodes.parallelStream().filter(candidateNode
-> {
       boolean isValid = true;
@@ -113,8 +117,18 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
 
     return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node,
         getAssignmentNormalizedScore(node, replica, clusterContext)))
-        .max(Comparator.comparingDouble((scoreEntry) -> scoreEntry.getValue()))
-        .map(Map.Entry::getKey);
+        .max((nodeEntry1, nodeEntry2) -> {
+          int scoreCompareResult = nodeEntry1.getValue().compareTo(nodeEntry2.getValue());
+          if (scoreCompareResult == 0) {
+            // If the evaluation scores of 2 nodes are the same, the algorithm assigns the
replica
+            // to the idle node first.
+            int idleScore1 = busyInstances.contains(nodeEntry1.getKey().getInstanceName())
? 0 : 1;
+            int idleScore2 = busyInstances.contains(nodeEntry2.getKey().getInstanceName())
? 0 : 1;
+            return idleScore1 - idleScore2;
+          } else {
+            return scoreCompareResult;
+          }
+        }).map(Map.Entry::getKey);
   }
 
   private double getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
@@ -200,4 +214,15 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
     });
     return orderedAssignableReplicas;
   }
+
+  /**
+   * @param assignments A collection of resource replicas assignment.
+   * @return A set of instance names that have at least one replica assigned in the input
assignments.
+   */
+  private Set<String> getBusyInstances(Collection<ResourceAssignment> assignments)
{
+    return assignments.stream().flatMap(
+        resourceAssignment -> resourceAssignment.getRecord().getMapFields().values().stream()
+            .flatMap(instanceStateMap -> instanceStateMap.keySet().stream())
+            .collect(Collectors.toSet()).stream()).collect(Collectors.toSet());
+  }
 }


Mime
View raw message