helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [helix] branch replica_level_throttle updated: [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
Date Fri, 23 Apr 2021 21:58:56 GMT
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/replica_level_throttle by this push:
     new 78ef755  [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
78ef755 is described below

commit 78ef755c37433c41540ff948a7b97facf4e00b6e
Author: Junkai Xue <jxue@linkedin.com>
AuthorDate: Fri Apr 23 14:58:48 2021 -0700

    [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
    
    * Add per replica rebalance type compute logic
    
    Three functions added: 1) rebalance type computation required state.
                           2) rebalance type per message
                           3) message sorting rules and comparators to determine which message
to apply first.
---
 .../stages/IntermediateStateCalcStage.java         | 82 +++++++++++++++++++++-
 1 file changed, 81 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c0defa9..a840d5e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -43,6 +45,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
@@ -796,6 +799,54 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
+   * Determine the message rebalance type with message and current states.
+   * @param desiredStates         Ideally how may states we needed for guarantee the health
of replica
+   * @param message               The message to be determined what is the rebalance type
+   * @param derivedCurrentStates  Derived from current states with previous messages not
be throttled.
+   * @return                      Rebalance type. Recovery or load.
+   */
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> desiredStates,
Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> desiredStatesSnapshot = new HashMap<>(desiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required
states.
+    for (String state : derivedCurrentStates.values()) {
+      if (desiredStatesSnapshot.containsKey(state)) {
+        if (desiredStatesSnapshot.get(state) == 1) {
+          desiredStatesSnapshot.remove(state);
+        } else {
+          desiredStatesSnapshot.put(state, desiredStatesSnapshot.get(state) - 1);
+        }
+      }
+    }
+
+    // If the message contains any "required" state changes, then it is considered recovery
rebalance.
+    // Otherwise, it is load balance.
+    return desiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+        : RebalanceType.LOAD_BALANCE;
+  }
+
+  private Map<String, Integer> getRequiredStates(String resourceName,
+      ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList)
{
+
+    // Prepare required inputs: 1) Priority State List 2) required number of replica
+    IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName);
+    StateModelDefinition stateModelDefinition =
+        resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
+    int requiredNumReplica = idealState.getMinActiveReplicas() == -1
+        ? idealState.getReplicaCount(preferenceList.size())
+        : idealState.getMinActiveReplicas();
+
+    // Generate a state mapping, state -> required numbers based on the live and enabled
instances for this partition
+    // preference list
+    LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDefinition.getStateCountMap(
+        (int) preferenceList.stream()
+            .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+            .count(), requiredNumReplica); // StateModelDefinition's counts
+
+    return expectedStateCountMap;
+  }
+
+  /**
    * Log rebalancer metadata for debugging purposes.
    * @param resource
    * @param allPartitions
@@ -874,7 +925,36 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  // Compare partitions according following standard:
+  private class MessagePriorityComparator implements Comparator<Message> {
+    private Map<String, Integer> _preferenceInstanceMap;
+    private Map<String, Integer> _statePriorityMap;
+
+    MessagePriorityComparator(List<String> preferenceList, Map<String, Integer>
statePriorityMap) {
+      // Get instance -> priority map.
+      _preferenceInstanceMap = IntStream.range(0, preferenceList.size())
+          .boxed()
+          .collect(Collectors.toMap(preferenceList::get, index -> index));
+      _statePriorityMap = statePriorityMap;
+    }
+
+    @Override
+    public int compare(Message m1, Message m2) {
+      //  Compare rules:
+      //     1. Higher target state has higher priority.
+      //     2. If target state is same, range it as preference list order.
+      //     3. Sort by the name of targeted instances just for deterministic ordering.
+      if (m1.getToState().equals(m2.getToState()) && _preferenceInstanceMap.containsKey(m1.getTgtName())
+          && _preferenceInstanceMap.containsKey(m2.getTgtName())) {
+        return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName()));
+      }
+      if (!m1.getToState().equals(m2.getToState())) {
+        return _statePriorityMap.get(m1.getToState()).compareTo(_statePriorityMap.get(m2.getToState()));
+      }
+      return m1.getTgtName().compareTo(m2.getTgtName());
+    }
+  }
+
+    // Compare partitions according following standard:
   // 1) Partition without top state always is the highest priority.
   // 2) For partition with top-state, the more number of active replica it has, the less
priority.
   private class PartitionPriorityComparator implements Comparator<Partition> {

Mime
View raw message