helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch wagedRebalancer updated: Implement Cluster Model Provider. (#392)
Date Tue, 13 Aug 2019 18:25:28 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 9bb8eec  Implement Cluster Model Provider. (#392)
9bb8eec is described below

commit 9bb8eec196418b23602a654304b8c871e28bec4a
Author: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Tue Aug 13 11:25:23 2019 -0700

    Implement Cluster Model Provider. (#392)
    
    * Implement Cluster Model Provider.
    
    The model provider is called in the WAGED rebalancer to generate CLuster Model based on
the current cluster status.
    The major responsibility of the provider is to parse all the assignable replicas and identify
which replicas need to be reassigned. Note that if the current best possible assignment is
still valid, the rebalancer won't need to calculate for the partition assignment.
    
    Also, add unit tests to verify the main logic.
---
 .../rebalancer/waged/ClusterDataDetector.java      |   3 +-
 .../rebalancer/waged/model/AssignableNode.java     |   2 +-
 .../rebalancer/waged/model/ClusterModel.java       |   1 -
 .../waged/model/ClusterModelProvider.java          | 247 +++++++++++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  29 ++-
 .../waged/model/TestClusterModelProvider.java      | 247 +++++++++++++++++++++
 6 files changed, 517 insertions(+), 12 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
index 07f16dd..0423edf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
@@ -35,11 +35,12 @@ public class ClusterDataDetector<T extends BaseControllerDataProvider>
{
    * All the cluster change type that may trigger a WAGED rebalancer re-calculation.
    */
   public enum ChangeType {
+    BaselineAssignmentChange,
     InstanceConfigChange,
     ClusterConfigChange,
     ResourceConfigChange,
-    InstanceStateChange,
     ResourceIdealStatesChange,
+    InstanceStateChange,
     OtherChange
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 989323e..5fc04d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -52,7 +52,7 @@ public class AssignableNode {
   private int _maxPartition; // maximum number of the partitions that can be assigned to
the node.
 
   // proposed assignment tracking
-  // <resource name, partition name>
+  // <resource name, partition name set>
   private Map<String, Set<String>> _currentAssignments;
   // <resource name, top state partition name>
   private Map<String, Set<String>> _currentTopStateAssignments;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 1be527a..6c4e67b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  */
 
 import org.apache.helix.HelixException;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.Collections;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
new file mode 100644
index 0000000..9de023b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -0,0 +1,247 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This util class generates Cluster Model object based on the controller's data cache.
+ */
+public class ClusterModelProvider {
+
+  /**
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note
that any
+   *                               resources that are not in this list will be removed from
the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in the calculation.
+   *                               Note this list can be different from the real active node
list
+   *                               according to the rebalancer logic.
+   * @param clusterChanges         All the cluster changes that happened after the previous
rebalance.
+   * @param baselineAssignment     The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that was generated
in the
+   *                               previous rebalance.
+   * @return Generate a new Cluster Model object according to the current cluster status.
+   */
+  public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap, Set<String> activeInstances,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+      Map<String, ResourceAssignment> baselineAssignment,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // Generate replica objects for all the resource partitions.
+    // <resource, replica set>
+    Map<String, Set<AssignableReplica>> replicaMap =
+        parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
+
+    // Check if the replicas need to be reassigned.
+    Map<String, Set<AssignableReplica>> allocatedReplicas =
+        new HashMap<>(); // <instanceName, replica set>
+    Set<AssignableReplica> toBeAssignedReplicas =
+        findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
+            bestPossibleAssignment, allocatedReplicas);
+
+    // Construct all the assignable nodes and initialize with the allocated replicas.
+    Set<AssignableNode> assignableNodes =
+        parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
+            activeInstances, allocatedReplicas);
+
+    // Construct and initialize cluster context.
+    ClusterContext context = new ClusterContext(
+        replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
+        activeInstances.size());
+    // Initial the cluster context with the allocated assignments.
+    context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
+
+    return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, baselineAssignment,
+        bestPossibleAssignment);
+  }
+
+  /**
+   * Find the minimum set of replicas that need to be reassigned.
+   * A replica needs to be reassigned if one of the following condition is true:
+   * 1. Cluster topology (the cluster config / any instance config) has been updated.
+   * 2. The baseline assignment has been updated.
+   * 3. The resource config has been updated.
+   * 4. The resource idealstate has been updated. TODO remove this condition when all resource
configurations are migrated to resource config.
+   * 5. If the current best possible assignment does not contain the partition's valid assignment.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by resource name.
+   * @param clusterChanges         A map contains all the important metadata updates that
happened after the previous rebalance.
+   * @param activeInstances        All the instances that are alive and enabled.
+   * @param bestPossibleAssignment The current best possible assignment.
+   * @param allocatedReplicas      Return the allocated replicas grouped by the target instance
name.
+   * @return The replicas that need to be reassigned.
+   */
+  private static Set<AssignableReplica> findToBeAssignedReplicas(
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Set<String>
activeInstances,
+      Map<String, ResourceAssignment> bestPossibleAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
+        || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
+        || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange))
{
+      // If the cluster topology or baseline assignment has been modified, need to reassign
all replicas
+      toBeAssignedReplicas
+          .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
+    } else {
+      // check each resource to identify the allocated replicas and to-be-assigned replicas.
+      for (String resourceName : replicaMap.keySet()) {
+        Set<AssignableReplica> replicas = replicaMap.get(resourceName);
+        // 1. if the resource config/idealstate is changed, need to reassign.
+        // 2. if the resource does appear in the best possible assignment, need to reassign.
+        if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
+            Collections.emptySet()).contains(resourceName) || clusterChanges
+            .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
+                Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment
+            .containsKey(resourceName)) {
+          toBeAssignedReplicas.addAll(replicas);
+          continue; // go to check next resource
+        } else {
+          // check for every best possible assignments to identify if the related replicas
need to reassign.
+          ResourceAssignment assignment = bestPossibleAssignment.get(resourceName);
+          // <partition, <instance, state>>
+          Map<String, Map<String, String>> stateMap = assignment.getMappedPartitions().stream()
+              .collect(Collectors.toMap(partition -> partition.getPartitionName(),
+                  partition -> new HashMap<>(assignment.getReplicaMap(partition))));
+          for (AssignableReplica replica : replicas) {
+            // Find any ACTIVE instance allocation that has the same state with the replica
+            Optional<Map.Entry<String, String>> instanceNameOptional =
+                stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap()).entrySet()
+                    .stream().filter(instanceStateMap ->
+                    instanceStateMap.getValue().equals(replica.getReplicaState()) &&
activeInstances
+                        .contains(instanceStateMap.getKey())).findAny();
+            // 3. if no such an instance in the bestPossible assignment, need to reassign
the replica
+            if (!instanceNameOptional.isPresent()) {
+              toBeAssignedReplicas.add(replica);
+              continue; // go to check the next replica
+            } else {
+              String instanceName = instanceNameOptional.get().getKey();
+              // * cleanup the best possible state map record,
+              // * so the selected instance won't be picked up again for the another replica
check
+              stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap())
+                  .remove(instanceName);
+              // the current best possible assignment for this replica is valid,
+              // add to the allocated replica list.
+              allocatedReplicas.computeIfAbsent(instanceName, key -> new HashSet<>()).add(replica);
+            }
+          }
+        }
+      }
+    }
+    return toBeAssignedReplicas;
+  }
+
+  /**
+   * Parse all the nodes that can be assigned replicas based on the configurations.
+   *
+   * @param clusterConfig     The cluster configuration.
+   * @param instanceConfigMap A map of all the instance configuration.
+   * @param activeInstances   All the instances that are online and enabled.
+   * @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned
during the rebalance.
+   * @return A map of assignable node set, <InstanceName, node set>.
+   */
+  private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
+      Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    return activeInstances.stream().map(
+        instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
+            instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet())))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * Parse all the replicas that need to be reallocated from the cluster data cache.
+   *
+   * @param dataProvider The cluster status cache that contains the current cluster status.
+   * @param resourceMap  All the valid resources that are managed by the rebalancer.
+   * @return A map of assignable replica set, <ResourceName, replica set>.
+   */
+  private static Map<String, Set<AssignableReplica>> parseAllReplicas(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+      int instanceCount) {
+    Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
+
+    for (String resourceName : resourceMap.keySet()) {
+      ResourceConfig config = dataProvider.getResourceConfig(resourceName);
+      IdealState is = dataProvider.getIdealState(resourceName);
+      if (is == null) {
+        throw new HelixException(
+            "Cannot find the resource ideal state for resource: " + resourceName);
+      }
+      String defName = is.getStateModelDefRef();
+      StateModelDefinition def = dataProvider.getStateModelDef(defName);
+      if (def == null) {
+        throw new IllegalArgumentException(String
+            .format("Cannot find state model definition %s for resource %s.",
+                is.getStateModelDefRef(), resourceName));
+      }
+
+      Map<String, Integer> stateCountMap =
+          def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount));
+
+      for (String partition : is.getPartitionSet()) {
+        for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
+          String state = entry.getKey();
+          for (int i = 0; i < entry.getValue(); i++) {
+            totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add(
+                new AssignableReplica(config, partition, state,
+                    def.getStatePriorityMap().get(state)));
+          }
+        }
+      }
+    }
+    return totalReplicaMap;
+  }
+
+  /**
+   * @return A map contains the assignments for each fault zone. <fault zone, <resource,
set of partitions>>
+   */
+  private static Map<String, Map<String, Set<String>>> mapAssignmentToFaultZone(
+      Set<AssignableNode> assignableNodes) {
+    Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new
HashMap<>();
+    assignableNodes.stream().forEach(node -> {
+      for (Map.Entry<String, Set<String>> resourceMap : node.getCurrentAssignmentsMap()
+          .entrySet()) {
+        faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new HashMap<>())
+            .computeIfAbsent(resourceMap.getKey(), k -> new HashSet<>())
+            .addAll(resourceMap.getValue());
+      }
+    });
+    return faultZoneAssignmentMap;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 0e2b43a..d99a3fb 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
+  protected static String _sessionId = "testSessionId";
   protected String _testInstanceId;
   protected List<String> _resourceNames;
   protected List<String> _partitionNames;
@@ -73,16 +74,27 @@ public abstract class AbstractTestClusterModel {
     _testFaultZoneId = "testZone";
   }
 
+  InstanceConfig createMockInstanceConfig(String instanceId) {
+    InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
+    testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
+    testInstanceConfig.addTag(_testInstanceTags.get(0));
+    testInstanceConfig.setInstanceEnabled(true);
+    testInstanceConfig.setZoneId(_testFaultZoneId);
+    return testInstanceConfig;
+  }
+
+  LiveInstance createMockLiveInstance(String instanceId) {
+    LiveInstance testLiveInstance = new LiveInstance(instanceId);
+    testLiveInstance.setSessionId(_sessionId);
+    return testLiveInstance;
+  }
+
   protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
     ResourceControllerDataProvider testCache = Mockito.mock(ResourceControllerDataProvider.class);
 
     // 1. Set up the default instance information with capacity configuration.
-    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceId");
-    testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
-    testInstanceConfig.addTag(_testInstanceTags.get(0));
+    InstanceConfig testInstanceConfig = createMockInstanceConfig(_testInstanceId);
     testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false);
-    testInstanceConfig.setInstanceEnabled(true);
-    testInstanceConfig.setZoneId(_testFaultZoneId);
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
@@ -95,8 +107,7 @@ public abstract class AbstractTestClusterModel {
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
 
     // 3. Mock the live instance node for the default instance.
-    LiveInstance testLiveInstance = new LiveInstance(_testInstanceId);
-    testLiveInstance.setSessionId("testSessionId");
+    LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
     Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
     liveInstanceMap.put(_testInstanceId, testLiveInstance);
     when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
@@ -130,7 +141,7 @@ public abstract class AbstractTestClusterModel {
     Map<String, CurrentState> currentStatemap = new HashMap<>();
     currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
     currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
-    when(testCache.getCurrentState(_testInstanceId, "testSessionId")).thenReturn(currentStatemap);
+    when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
 
     // 5. Set up the resource config for the two resources with the partition weight.
     Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
@@ -162,7 +173,7 @@ public abstract class AbstractTestClusterModel {
   protected Set<AssignableReplica> generateReplicas(ResourceControllerDataProvider
dataProvider) {
     // Create assignable replica based on the current state.
     Map<String, CurrentState> currentStatemap =
-        dataProvider.getCurrentState(_testInstanceId, "testSessionId");
+        dataProvider.getCurrentState(_testInstanceId, _sessionId);
     Set<AssignableReplica> assignmentSet = new HashSet<>();
     for (CurrentState cs : currentStatemap.values()) {
       ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
new file mode 100644
index 0000000..f92a66c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -0,0 +1,247 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+public class TestClusterModelProvider extends AbstractTestClusterModel {
+  Set<String> _instances;
+
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+    _instances = new HashSet<>();
+    _instances.add(_testInstanceId);
+  }
+
+  @Override
+  protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+    ResourceControllerDataProvider testCache = super.setupClusterDataCache();
+
+    // Set up mock idealstate
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState is = new IdealState(resource);
+      is.setNumPartitions(_partitionNames.size());
+      is.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      is.setStateModelDefRef("MasterSlave");
+      is.setReplicas("3");
+      is.setRebalancerClassName(WagedRebalancer.class.getName());
+      _partitionNames.stream()
+          .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
+      isMap.put(resource, is);
+    }
+    when(testCache.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+
+    // Set up 2 more instances
+    for (int i = 1; i < 3; i++) {
+      String instanceName = _testInstanceId + i;
+      _instances.add(instanceName);
+      // 1. Set up the default instance information with capacity configuration.
+      InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
+      Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+      instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+      // 2. Mock the live instance node for the default instance.
+      LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
+      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      liveInstanceMap.put(instanceName, testLiveInstance);
+      when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    }
+
+    return testCache;
+  }
+
+  @Test
+  public void testGenerateClusterModel() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    // 1. test generating a cluster model with empty assignment
+    ClusterModel clusterModel = ClusterModelProvider.generateClusterModel(testCache,
+        _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+    // There should be no existing assignment.
+    Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .anyMatch(resourceMap -> !resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+    // Have all 3 instances
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), _instances);
+    // Shall have 2 resources and 12 replicas
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 12));
+
+    // 2. test with only one active node
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        Collections.singleton(_testInstanceId), Collections.emptyMap(), Collections.emptyMap(),
+        Collections.emptyMap());
+    // Have only one instance
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), Collections.singleton(_testInstanceId));
+    // Shall have 4 assignable replicas because there is only one valid node.
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 4));
+
+    // 3. test with no active instance
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
+        Collections.emptyMap());
+    // Have only one instance
+    Assert.assertEquals(clusterModel.getAssignableNodes().size(), 0);
+    // Shall have 0 assignable replicas because there is only n0 valid node.
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.isEmpty()));
+
+    // 4. test with best possible assignment
+    // Mock a best possible assignment based on the current states.
+    Map<String, ResourceAssignment> bestPossibleAssignment = new HashMap<>();
+    for (String resource : _resourceNames) {
+      // <partition, <instance, state>>
+      Map<String, Map<String, String>> assignmentMap = new HashMap<>();
+      CurrentState cs = testCache.getCurrentState(_testInstanceId, _sessionId).get(resource);
+      if (cs != null) {
+        for (Map.Entry<String, String> stateEntry : cs.getPartitionStateMap().entrySet())
{
+          assignmentMap.computeIfAbsent(stateEntry.getKey(), k -> new HashMap<>())
+              .put(_testInstanceId, stateEntry.getValue());
+        }
+        ResourceAssignment assignment = new ResourceAssignment(resource);
+        assignmentMap.keySet().stream().forEach(partition -> assignment
+            .addReplicaMap(new Partition(partition), assignmentMap.get(partition)));
+        bestPossibleAssignment.put(resource, assignment);
+      }
+    }
+
+    // Generate a cluster model based on the best possible assignment
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, Collections.emptyMap(), Collections.emptyMap(), bestPossibleAssignment);
+    // There should be 4 existing assignments in total (each resource has 2) in the specified
instance
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.values().stream()
+            .allMatch(partitionSet -> partitionSet.size() == 2)));
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().get(_testInstanceId).getCurrentAssignmentCount(),
4);
+    // Since each resource has 2 replicas assigned, the assignable replica count should be
10.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 10));
+
+    // 5. test with best possible assignment but cluster topology is changed
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange,
+            Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment);
+    // There should be no existing assignment since the topology change invalidates all existing
assignment
+    Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .allMatch(resourceMap -> resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+    // Shall have 2 resources and 12 replicas
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 12));
+
+    // 6. test with best possible assignment and one resource config change
+    // Generate a cluster model based on the same best possible assignment, but resource1
config is changed
+    String changedResourceName = _resourceNames.get(0);
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange,
+            Collections.singleton(changedResourceName)), Collections.emptyMap(),
+        bestPossibleAssignment);
+    // There should be no existing assignment for all the resource except for resource2.
+    Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(),
1);
+    Map<String, Set<String>> resourceAssignmentMap =
+        clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId);
+    // Should be only resource2 in the map
+    Assert.assertEquals(resourceAssignmentMap.size(), 1);
+    for (String resource : _resourceNames) {
+      Assert
+          .assertEquals(resourceAssignmentMap.getOrDefault(resource, Collections.emptySet()).size(),
+              resource.equals(changedResourceName) ? 0 : 2);
+    }
+    // Only the first instance will have 2 assignment from resource2.
+    for (String instance : _instances) {
+      Assert
+          .assertEquals(clusterModel.getAssignableNodes().get(instance).getCurrentAssignmentCount(),
+              instance.equals(_testInstanceId) ? 2 : 0);
+    }
+    // Shall have 2 resources and 12 replicas
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().keySet().size(), 2);
+    for (String resource : _resourceNames) {
+      Assert.assertEquals(clusterModel.getAssignableReplicaMap().get(resource).size(),
+          resource.equals(changedResourceName) ? 12 : 10);
+    }
+
+    // 7. test with best possible assignment but the instance becomes inactive
+    // Generate a cluster model based on the best possible assignment, but the assigned node
is disabled
+    Set<String> limitedActiveInstances = new HashSet<>(_instances);
+    limitedActiveInstances.remove(_testInstanceId);
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        limitedActiveInstances, Collections.emptyMap(), Collections.emptyMap(),
+        bestPossibleAssignment);
+    // There should be no existing assignment.
+    Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+        .anyMatch(resourceMap -> !resourceMap.isEmpty()));
+    Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+    // Have only 2 instances
+    Assert.assertEquals(
+        clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
+            .collect(Collectors.toSet()), limitedActiveInstances);
+    // Since only 2 instances are active, we shall have 8 assignable replicas in each resource.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 8));
+
+  }
+}


Mime
View raw message