helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] helix git commit: [HELIX-674] Introducing constraints based rebalancing mechanism.
Date Tue, 24 Apr 2018 00:17:05 GMT
Repository: helix
Updated Branches:
  refs/heads/master 317c300c8 -> 3d2d57b05


http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
new file mode 100644
index 0000000..5bd3bdb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
@@ -0,0 +1,216 @@
+package org.apache.helix.util;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.RebalanceConfig;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.rebalancer.strategy.ConstraintRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.*;
+
+import java.util.*;
+
+/**
+ * A rebalance tool that generate an resource partition assignment based on the input.
+ * Note the assignment won't be automatically applied to the cluster. Users are supposed to
+ * apply the change.
+ *
+ * @see org.apache.helix.examples.WeightAwareRebalanceUtilExample WeightAwareRebalanceUtilExample
+ */
+public class WeightAwareRebalanceUtil {
+  private final ClusterConfig _clusterConfig;
+  private final Map<String, InstanceConfig> _instanceConfigMap = new HashMap<>();
+  // For the possible customized state models.
+  private final Map<String, StateModelDefinition> _stateModelDefs = new HashMap<>();
+  private final ClusterDataCache _dataCache;
+
+  private enum RebalanceOption {
+    INCREMENTAL,
+    FULL
+  }
+
+  /**
+   * Init the rebalance util with cluster and instances information.
+   *
+   * Note that it is not required to put any configuration items in these configs.
+   * However, in order to do topology aware rebalance, users need to set topology information such as Domain, fault zone, and TopologyAwareEnabled.
+   *
+   * The other config items will not be read or processed by the util.
+   *
+   * @param clusterConfig
+   * @param instanceConfigs InstanceConfigs for all assignment candidates.
+   *                        Note that all instances will be treated as enabled and alive during the calculation.
+   */
+  public WeightAwareRebalanceUtil(ClusterConfig clusterConfig,
+      List<InstanceConfig> instanceConfigs) {
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      // ensure the instance is enabled
+      instanceConfig.setInstanceEnabled(true);
+      _instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig);
+    }
+    // ensure no instance is disabled
+    clusterConfig.setDisabledInstances(Collections.<String, String>emptyMap());
+    _clusterConfig = clusterConfig;
+
+    _dataCache = new ClusterDataCache();
+    _dataCache.setInstanceConfigMap(_instanceConfigMap);
+    _dataCache.setClusterConfig(_clusterConfig);
+    List<LiveInstance> liveInstanceList = new ArrayList<>();
+    for (String instance : _instanceConfigMap.keySet()) {
+      LiveInstance liveInstance = new LiveInstance(instance);
+      liveInstanceList.add(liveInstance);
+    }
+    _dataCache.setLiveInstances(liveInstanceList);
+  }
+
+  /**
+   * Generate partition assignments for all new resources or partitions that have not been assigned yet.
+   * Note that a partition assignment that does not fit the state model will still be recalculated.
+   * For example, if the replica requirement is 3, but one partition has only 2 replicas, this partition will still
+   * be rebalanced even existing assignment exists.
+   *
+   * @param resourceConfigs    Config of all the resources that need to be rebalanced.
+   *                           The tool throws Exception if any resource has no IS or broken/uninitialized IS.
+   *                           The tool throws Exception if any resource is in full-auto mode.
+   *                           Following fields are required by the tool:
+   *                           1. ResourceName
+   *                           2. StateModelDefRef
+   *                           3. PreferenceLists, which includes all partitions in the resource
+   *                           4. NumReplica
+   * @param existingAssignment The existing partition assignment of the resources specified in param resourceConfigs.
+   *                           Unrelated resource assignment will be discarded.
+   * @param hardConstraints    Hard constraints for rebalancing.
+   * @param softConstraints    Soft constraints for rebalancing.
+   *
+   * @return List of the IS that contains preference list and suggested state map
+   **/
+  public ResourcesStateMap buildIncrementalRebalanceAssignment(List<ResourceConfig> resourceConfigs,
+      ResourcesStateMap existingAssignment,
+      List<? extends AbstractRebalanceHardConstraint> hardConstraints,
+      List<? extends AbstractRebalanceSoftConstraint> softConstraints) {
+    return calculateAssignment(resourceConfigs, existingAssignment, RebalanceOption.INCREMENTAL,
+        hardConstraints, softConstraints);
+  }
+
+  /**
+   * Re-calculate the partition assignments for all the resources specified in resourceConfigs list.
+   *
+   * @param resourceConfigs    Config of all the resources that need to be rebalanced.
+   *                           The tool throws Exception if any resource has no IS or broken/uninitialized IS.
+   *                           The tool throws Exception if any resource is in full-auto mode.
+   *                           Following fields are required by the tool:
+   *                           1. ResourceName
+   *                           2. StateModelDefRef
+   *                           3. PreferenceLists, which includes all partitions in the resource
+   *                           4. NumReplica
+   * @param preferredAssignment A set of preferred partition assignments for the resources specified in param resourceConfigs.
+   *                            The preference is not guaranteed.
+   * @param hardConstraints    Hard constraints for rebalancing.
+   * @param softConstraints    Soft constraints for rebalancing.
+   *
+   * @return List of the IS that contains preference list and suggested state map
+   **/
+  public ResourcesStateMap buildFullRebalanceAssignment(List<ResourceConfig> resourceConfigs,
+      ResourcesStateMap preferredAssignment,
+      List<? extends AbstractRebalanceHardConstraint> hardConstraints,
+      List<? extends AbstractRebalanceSoftConstraint> softConstraints) {
+    return calculateAssignment(resourceConfigs, preferredAssignment, RebalanceOption.FULL,
+        hardConstraints, softConstraints);
+  }
+
+  /**
+   * The method to generate partition assignment mappings.
+   *
+   * @param resourceConfigs    Config of all the resources that need to be rebalanced.
+   *                           The tool throws Exception if any resource has no IS or broken/uninitialized IS.
+   *                           The tool throws Exception if any resource is in full-auto mode.
+   *                           Following fields are required by the tool:
+   *                           1. ResourceName
+   *                           2. StateModelDefRef
+   *                           3. PreferenceLists, which includes all partitions in the resource
+   *                           4. NumReplica
+   * @param existingAssignment The existing partition assignment of the resources specified in param resourceConfigs.
+   * @param option             INCREMENTAL or FULL
+   *                           INCREMENTAL: Keep existing assignment. Only generate new partition assignment.
+   *                           FULL: Completely re-assign resources' partitions.
+   * @param hardConstraints    Hard constraints for rebalancing.
+   * @param softConstraints    Soft constraints for rebalancing.
+   *
+   * @return List of the IS that contains preference list and suggested state map
+   **/
+  private ResourcesStateMap calculateAssignment(List<ResourceConfig> resourceConfigs,
+      ResourcesStateMap existingAssignment, RebalanceOption option,
+      List<? extends AbstractRebalanceHardConstraint> hardConstraints,
+      List<? extends AbstractRebalanceSoftConstraint> softConstraints) {
+    // check the inputs
+    for (ResourceConfig resourceConfig : resourceConfigs) {
+      RebalanceConfig.RebalanceMode rebalanceMode =
+          resourceConfig.getRebalanceConfig().getRebalanceMode();
+      if (rebalanceMode.equals(RebalanceConfig.RebalanceMode.FULL_AUTO)) {
+        throw new HelixException(
+            "Resources that in FULL_AUTO mode are not supported: " + resourceConfig
+                .getResourceName());
+      }
+    }
+
+    ConstraintRebalanceStrategy constraintBasedStrategy =
+        new ConstraintRebalanceStrategy(hardConstraints, softConstraints);
+
+    ResourcesStateMap resultAssignment = new ResourcesStateMap();
+
+    for (ResourceConfig resourceConfig : resourceConfigs) {
+      Map<String, Map<String, String>> preferredMapping = new HashMap<>();
+      if (existingAssignment != null) {
+        PartitionStateMap partitionStateMap = existingAssignment.getPartitionStateMap(resourceConfig.getResourceName());
+        // keep existing assignment if rebalance option is INCREMENTAL
+        if (option.equals(RebalanceOption.INCREMENTAL) && partitionStateMap != null) {
+          for (Partition partition : partitionStateMap.getStateMap().keySet()) {
+            preferredMapping.put(partition.getPartitionName(), partitionStateMap.getPartitionMap(partition));
+          }
+        }
+      }
+
+      StateModelDefinition stateModelDefinition =
+          getStateModelDef(resourceConfig.getStateModelDefRef());
+      constraintBasedStrategy.init(resourceConfig.getResourceName(),
+          new ArrayList<>(resourceConfig.getPreferenceLists().keySet()), stateModelDefinition
+              .getStateCountMap(_instanceConfigMap.size(),
+                  Integer.parseInt(resourceConfig.getNumReplica())), Integer.MAX_VALUE);
+
+      List<String> instanceNames = new ArrayList<>(_instanceConfigMap.keySet());
+      ZNRecord znRecord = constraintBasedStrategy
+          .computePartitionAssignment(instanceNames, instanceNames, preferredMapping, _dataCache);
+      Map<String, Map<String, String>> stateMap = znRecord.getMapFields();
+      // Construct resource states result
+      PartitionStateMap newStateMap = new PartitionStateMap(resourceConfig.getResourceName());
+      for (String partition : stateMap.keySet()) {
+        newStateMap.setState(new Partition(partition), stateMap.get(partition));
+      }
+      resultAssignment.setState(resourceConfig.getResourceName(), newStateMap);
+    }
+    return resultAssignment;
+  }
+
+  private StateModelDefinition getStateModelDef(String stateModelDefRef) {
+    if (_stateModelDefs.containsKey(stateModelDefRef)) {
+      return _stateModelDefs.get(stateModelDefRef);
+    }
+    return BuiltInStateModelDefinitions.valueOf(stateModelDefRef).getStateModelDefinition();
+  }
+
+  /**
+   * Since the tool is designed not to rely on ZK, if the application has customized state model,
+   * it needs to register to the tool before calling for an assignment.
+   *
+   * @param stateModelDefRef
+   * @param stateModelDefinition
+   */
+  public void registerCustomizedStateModelDef(String stateModelDefRef,
+      StateModelDefinition stateModelDefinition) {
+    _stateModelDefs.put(stateModelDefRef, stateModelDefinition);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java
new file mode 100644
index 0000000..e85e65c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestConstraintRebalanceStrategy.java
@@ -0,0 +1,453 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
+import org.apache.helix.controller.rebalancer.constraint.TotalCapacityConstraint;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockCapacityProvider;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockPartitionWeightProvider;
+import org.apache.helix.controller.rebalancer.strategy.ConstraintRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.*;
+
+public class TestConstraintRebalanceStrategy {
+  private static Logger _logger = LoggerFactory.getLogger(TestConstraintRebalanceStrategy.class);
+
+  final String resourceNamePrefix = "resource";
+  final int nParticipants = 40;
+  final int nResources = 20;
+  final int nPartitions = 100;
+  final int nReplicas = 3;
+  final int defaultCapacity = 6000; // total = 6000*40 = 240000
+  final int resourceWeight = 10; // total = 20*100*3*10 = 60000
+  final String topState = "ONLINE";
+
+  final List<String> resourceNames = new ArrayList<>();
+  final List<String> instanceNames = new ArrayList<>();
+  final List<String> partitions = new ArrayList<>(nPartitions);
+
+  final ClusterDataCache cache = new ClusterDataCache();
+  final LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
+
+  @BeforeClass
+  public void beforeClass() {
+    for (int i = 0; i < nResources; i++) {
+      resourceNames.add(resourceNamePrefix + i);
+    }
+    for (int i = 0; i < nParticipants; i++) {
+      instanceNames.add("node" + i);
+    }
+    for (int i = 0; i < nPartitions; i++) {
+      partitions.add(Integer.toString(i));
+    }
+
+    setupMockCluster();
+  }
+
+  private void setupMockCluster() {
+    List<LiveInstance> liveInstanceList = new ArrayList<>();
+    Map<String, InstanceConfig> instanceConfigs = new HashMap<>();
+    for (String instance : instanceNames) {
+      LiveInstance liveInstance = new LiveInstance(instance);
+      liveInstanceList.add(liveInstance);
+      InstanceConfig config = new InstanceConfig(instance);
+      instanceConfigs.put(instance, config);
+    }
+    cache.setLiveInstances(liveInstanceList);
+    cache.setInstanceConfigMap(instanceConfigs);
+    ClusterConfig clusterConfig = new ClusterConfig("test");
+    clusterConfig.setTopologyAwareEnabled(false);
+    cache.setClusterConfig(clusterConfig);
+
+    states.put("OFFLINE", 0);
+    states.put(topState, nReplicas);
+  }
+
+  private Map<String, Map<String, Map<String, String>>> calculateAssignment(
+      List<AbstractRebalanceHardConstraint> hardConstraints,
+      List<AbstractRebalanceSoftConstraint> softConstraints) {
+    Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
+
+    ConstraintRebalanceStrategy strategy =
+        new ConstraintRebalanceStrategy(hardConstraints, softConstraints);
+
+    for (String resourceName : resourceNames) {
+      Map<String, Map<String, String>> partitionMap = new HashMap<>();
+
+      strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
+      partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
+          new HashMap<String, Map<String, String>>(), cache).getMapFields());
+      result.put(resourceName, partitionMap);
+    }
+    return result;
+  }
+
+  private Map<String, Integer> checkPartitionUsage(
+      Map<String, Map<String, Map<String, String>>> assignment,
+      PartitionWeightProvider weightProvider) {
+    Map<String, Integer> weightCount = new HashMap<>();
+    for (String resource : assignment.keySet()) {
+      Map<String, Map<String, String>> partitionMap = assignment.get(resource);
+      for (String partition : partitionMap.keySet()) {
+        // check states
+        Map<String, Integer> stateCount = new HashMap<>(states);
+        Map<String, String> stateMap = partitionMap.get(partition);
+        for (String state : stateMap.values()) {
+          Assert.assertTrue(stateCount.containsKey(state));
+          stateCount.put(state, stateCount.get(state) - 1);
+        }
+        for (int count : stateCount.values()) {
+          Assert.assertEquals(count, 0);
+        }
+
+        // report weight
+        int partitionWeight = weightProvider.getPartitionWeight(resource, partition);
+        for (String instance : partitionMap.get(partition).keySet()) {
+          if (!weightCount.containsKey(instance)) {
+            weightCount.put(instance, partitionWeight);
+          } else {
+            weightCount.put(instance, weightCount.get(instance) + partitionWeight);
+          }
+        }
+      }
+    }
+    return weightCount;
+  }
+
+  @Test
+  public void testEvenness() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      capacity.put(instance, defaultCapacity);
+    }
+
+    PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
+    CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
+
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    Map<String, Map<String, Map<String, String>>> assignment = calculateAssignment(
+        Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+  }
+
+  @Test
+  public void testEvennessByDefaultConstraint() {
+    Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
+
+    ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy();
+
+    for (String resourceName : resourceNames) {
+      Map<String, Map<String, String>> partitionMap = new HashMap<>();
+
+      strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
+      partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
+          new HashMap<String, Map<String, String>>(), cache).getMapFields());
+      result.put(resourceName, partitionMap);
+    }
+
+    Map<String, Integer> weightCount = checkPartitionUsage(result, new PartitionWeightProvider() {
+      @Override
+      public int getPartitionWeight(String resource, String partition) {
+        return 1;
+      }
+    });
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+  }
+
+  @Test
+  public void testCapacityAwareEvenness() {
+    // capacity / weight
+    int totalBucket = 0;
+    Map<String, Integer> capacity = new HashMap<>();
+    for (int i = 0; i < instanceNames.size(); i++) {
+      capacity.put(instanceNames.get(i), defaultCapacity * (1 + i % 3));
+      totalBucket += 1 + i % 3;
+    }
+    int partitionWeightGranularity = (int) (resourceWeight * 1.5);
+    int totalPartitionWeight = 0;
+    Random ran = new Random(System.currentTimeMillis());
+    Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
+    for (String resource : resourceNames) {
+      Map<String, Integer> weights = new HashMap<>();
+      for (String partition : partitions) {
+        int weight = resourceWeight / 2 + ran.nextInt(resourceWeight);
+        weights.put(partition, weight);
+        totalPartitionWeight += weight * nReplicas;
+      }
+      partitionWeightMap.put(resource, weights);
+    }
+
+    PartitionWeightProvider weightProvider =
+        new MockPartitionWeightProvider(partitionWeightMap, resourceWeight);
+    CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
+
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    Map<String, Map<String, Map<String, String>>> assignment =
+        calculateAssignment(Collections.EMPTY_LIST,
+            Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+
+    for (int i = 0; i < instanceNames.size(); i++) {
+      String instanceName = instanceNames.get(i);
+      int expectedUsage = (int) ((double) totalPartitionWeight) / totalBucket * (1 + i % 3);
+      int realUsage = weightCount.get(instanceName);
+      // When have different capacity, calculation in the rebalance algorithm would have more fractions, so lose the restriction to 90% to 110% compared with the ideal value.
+      Assert.assertTrue((expectedUsage - partitionWeightGranularity) * 0.9 <= realUsage
+          && (expectedUsage + partitionWeightGranularity) * 1.1 >= realUsage);
+    }
+  }
+
+  @Test
+  public void testHardConstraintFails() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      // insufficient capacity
+      capacity.put(instance, defaultCapacity / 100);
+    }
+
+    PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
+    CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
+
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+
+    try {
+      calculateAssignment(
+          Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+          Collections.EMPTY_LIST);
+      Assert.fail("Assignment should fail because of insufficient capacity.");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+  }
+
+  @Test(dependsOnMethods = "testHardConstraintFails")
+  public void testConflictConstraint() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      // insufficient capacity
+      capacity.put(instance, defaultCapacity);
+    }
+
+    PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
+    CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
+
+    TotalCapacityConstraint normalCapacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    TotalCapacityConstraint conflictingCapacityConstraint =
+        new TotalCapacityConstraint(weightProvider,
+            new MockCapacityProvider(Collections.EMPTY_MAP, 0));
+    List<AbstractRebalanceHardConstraint> constraints = new ArrayList<>();
+    constraints.add(normalCapacityConstraint);
+    constraints.add(conflictingCapacityConstraint);
+
+    try {
+      calculateAssignment(constraints, Collections.EMPTY_LIST);
+      Assert.fail("Assignment should fail because of the conflicting capacity constraint.");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvenness")
+  public void testSoftConstraintFails() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      // insufficient capacity
+      capacity.put(instance, defaultCapacity / 50);
+    }
+
+    PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
+    CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
+
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    Map<String, Map<String, Map<String, String>>> assignment =
+        calculateAssignment(Collections.EMPTY_LIST,
+            Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+  }
+
+  @Test(dependsOnMethods = "testEvenness")
+  public void testRebalanceWithPreferredAssignment() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      capacity.put(instance, defaultCapacity);
+    }
+
+    PartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
+    CapacityProvider capacityProvider = new MockCapacityProvider(capacity, 0);
+
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    // inject valid partition assignment for one resources into preferred assignment.
+    List<String> instances = instanceNames.subList(0, nReplicas);
+    Map<String, Map<String, String>> preferredPartitionAssignment = new HashMap<>();
+    Map<String, String> replicaState = new HashMap<>();
+    for (String instance : instances) {
+      replicaState.put(instance, topState);
+    }
+    preferredPartitionAssignment.put(partitions.get(0), replicaState);
+    Map<String, Map<String, Map<String, String>>> preferredAssignment = new HashMap<>();
+    preferredAssignment.put(resourceNames.get(0), preferredPartitionAssignment);
+
+    // inject invalid partition assignment for one resources into preferred assignment.
+    instances = instanceNames.subList(0, nReplicas - 1);
+    Map<String, String> invalidReplicaState = new HashMap<>();
+    for (String instance : instances) {
+      invalidReplicaState.put(instance, topState);
+    }
+    preferredPartitionAssignment = new HashMap<>();
+    preferredPartitionAssignment.put(partitions.get(0), invalidReplicaState);
+    preferredAssignment.put(resourceNames.get(1), preferredPartitionAssignment);
+
+    Map<String, Map<String, Map<String, String>>> assignment = new HashMap<>();
+    ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy(Collections.EMPTY_LIST,
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    for (String resourceName : resourceNames) {
+      Map<String, Map<String, String>> partitionMap = new HashMap<>();
+
+      strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
+      partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
+          preferredAssignment.containsKey(resourceName) ?
+              preferredAssignment.get(resourceName) :
+              Collections.EMPTY_MAP, cache).getMapFields());
+      assignment.put(resourceName, partitionMap);
+    }
+
+    // Even with preferred assignment, the weight should still be balance
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+
+    // the resource 0 assignment should be kept the same
+    Collection<String> resource_0_Assignment =
+        assignment.get(resourceNames.get(0)).get(partitions.get(0)).keySet();
+    Assert.assertTrue(resource_0_Assignment.containsAll(instanceNames.subList(0, nReplicas))
+        && resource_0_Assignment.size() == nReplicas);
+    // the resource 1 assignment should be set to a valid one
+    Assert.assertTrue(
+        assignment.get(resourceNames.get(1)).get(partitions.get(0)).size() == nReplicas);
+  }
+
+  @Test
+  public void testTopologyAwareAssignment() {
+    // Topology Aware configuration
+    ClusterDataCache cache = new ClusterDataCache();
+    List<LiveInstance> liveInstanceList = new ArrayList<>();
+    Map<String, InstanceConfig> instanceConfigs = new HashMap<>();
+    for (int i = 0; i < instanceNames.size(); i++) {
+      String instance = instanceNames.get(i);
+      LiveInstance liveInstance = new LiveInstance(instance);
+      liveInstanceList.add(liveInstance);
+      InstanceConfig config = new InstanceConfig(instance);
+      config.setDomain(String.format("Rack=%s,Host=%s", i % (nParticipants / 5), instance));
+      instanceConfigs.put(instance, config);
+    }
+    cache.setLiveInstances(liveInstanceList);
+    cache.setInstanceConfigMap(instanceConfigs);
+    ClusterConfig clusterConfig = new ClusterConfig("test");
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setTopology("/Rack/Host");
+    clusterConfig.setFaultZoneType("Rack");
+    cache.setClusterConfig(clusterConfig);
+
+    Map<String, Map<String, Map<String, String>>> result = new HashMap<>();
+    ConstraintRebalanceStrategy strategy = new ConstraintRebalanceStrategy();
+
+    for (String resourceName : resourceNames) {
+      Map<String, Map<String, String>> partitionMap = new HashMap<>();
+
+      strategy.init(resourceName, partitions, states, Integer.MAX_VALUE);
+      partitionMap.putAll(strategy.computePartitionAssignment(instanceNames, instanceNames,
+          new HashMap<String, Map<String, String>>(), cache).getMapFields());
+      result.put(resourceName, partitionMap);
+    }
+
+    Map<String, Integer> weightCount = checkPartitionUsage(result, new PartitionWeightProvider() {
+      @Override
+      public int getPartitionWeight(String resource, String partition) {
+        return defaultCapacity;
+      }
+    });
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+
+    // check for domain assignment
+    Map<String, Set<String>> domainPartitionMap = new HashMap<>();
+    for (Map<String, Map<String, String>> partitionMap : result.values()) {
+      domainPartitionMap.clear();
+      for (String partition : partitionMap.keySet()) {
+        for (String instance : partitionMap.get(partition).keySet()) {
+          String domain = instanceConfigs.get(instance).getDomain().split(",")[0].split("=")[1];
+          if (domainPartitionMap.containsKey(domain)) {
+            Assert.assertFalse(domainPartitionMap.get(domain).contains(partition));
+          } else {
+            domainPartitionMap.put(domain, new HashSet<String>());
+          }
+          domainPartitionMap.get(domain).add(partition);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java
new file mode 100644
index 0000000..3235c0b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockCapacityProvider.java
@@ -0,0 +1,52 @@
+package org.apache.helix.controller.rebalancer.constraint.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockCapacityProvider implements CapacityProvider {
+  private final int _defaultCapacity;
+  private final Map<String, Integer> _capacityMap = new HashMap<>();
+  private final Map<String, Integer> _usageMap = new HashMap<>();
+
+  public MockCapacityProvider(Map<String, Integer> capacityMap, int defaultCapacity) {
+    _capacityMap.putAll(capacityMap);
+    _defaultCapacity = defaultCapacity;
+  }
+
+  @Override
+  public int getParticipantCapacity(String participant) {
+    if (_capacityMap.containsKey(participant)) {
+      return _capacityMap.get(participant);
+    }
+    return _defaultCapacity;
+  }
+
+  @Override
+  public int getParticipantUsage(String participant) {
+    if (_usageMap.containsKey(participant)) {
+      return _usageMap.get(participant);
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java
new file mode 100644
index 0000000..143e86e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/MockPartitionWeightProvider.java
@@ -0,0 +1,50 @@
+package org.apache.helix.controller.rebalancer.constraint.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockPartitionWeightProvider implements PartitionWeightProvider {
+  private final int _defaultWeight;
+  private Map<String, Map<String, Integer>> _partitionWeightMap = new HashMap<>();
+
+  public MockPartitionWeightProvider(int defaultWeight) {
+    // use the default weight
+    _defaultWeight = defaultWeight;
+  }
+
+  public MockPartitionWeightProvider(Map<String, Map<String, Integer>> partitionWeightMap,
+      int defaultWeight) {
+    _partitionWeightMap = partitionWeightMap;
+    _defaultWeight = defaultWeight;
+  }
+
+  @Override
+  public int getPartitionWeight(String resource, String partition) {
+    if (_partitionWeightMap.containsKey(resource) && _partitionWeightMap.get(resource)
+        .containsKey(partition)) {
+      return _partitionWeightMap.get(resource).get(partition);
+    }
+    return _defaultWeight;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java b/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java
new file mode 100644
index 0000000..488fcab
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestWeightBasedRebalanceUtil.java
@@ -0,0 +1,488 @@
+package org.apache.helix.integration;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.RebalanceConfig;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
+import org.apache.helix.controller.rebalancer.constraint.TotalCapacityConstraint;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockCapacityProvider;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.MockPartitionWeightProvider;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedCapacityProvider;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedPartitionWeightProvider;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.model.*;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.WeightAwareRebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.*;
+
+import static org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedPartitionWeightProvider.DEFAULT_WEIGHT_VALUE;
+
+public class TestWeightBasedRebalanceUtil extends ZkIntegrationTestBase {
+  private static Logger _logger = LoggerFactory.getLogger(TestWeightBasedRebalanceUtil.class);
+  private static String CLUSTER_NAME;
+  private static ClusterSetup _setupTool;
+
+  final String resourceNamePrefix = "resource";
+  final int nParticipants = 40;
+  final int nResources = 20;
+  final int nPartitions = 100;
+  final int nReplicas = 3;
+  final int defaultCapacity = 6000; // total = 6000*40 = 240000
+  final int resourceWeight = 10; // total = 20*100*3*10 = 60000
+  final String topState = "ONLINE";
+
+  final List<String> resourceNames = new ArrayList<>();
+  final List<String> instanceNames = new ArrayList<>();
+  final List<String> partitions = new ArrayList<>(nPartitions);
+  final List<ResourceConfig> resourceConfigs = new ArrayList<>();
+
+  final LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
+
+  final ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+  final List<InstanceConfig> instanceConfigs = new ArrayList<>();
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println(
+        "START " + getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis()));
+
+    CLUSTER_NAME = "MockCluster" + getShortClassName();
+
+    for (int i = 0; i < nParticipants; i++) {
+      instanceNames.add("node" + i);
+    }
+    for (int i = 0; i < nPartitions; i++) {
+      partitions.add(Integer.toString(i));
+    }
+
+    for (int i = 0; i < nResources; i++) {
+      resourceNames.add(resourceNamePrefix + i);
+      ResourceConfig.Builder resourcBuilder = new ResourceConfig.Builder(resourceNamePrefix + i);
+      resourcBuilder.setStateModelDefRef("OnlineOffline");
+      resourcBuilder.setNumReplica(nReplicas);
+      for (String partition : partitions) {
+        resourcBuilder.setPreferenceList(partition, Collections.EMPTY_LIST);
+      }
+      resourceConfigs.add(resourcBuilder.build());
+    }
+
+    setupMockCluster();
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _setupTool.deleteCluster(CLUSTER_NAME);
+  }
+
+  private void setupMockCluster() {
+    for (String instance : instanceNames) {
+      InstanceConfig config = new InstanceConfig(instance);
+      instanceConfigs.add(config);
+    }
+
+    states.put("OFFLINE", 0);
+    states.put(topState, nReplicas);
+  }
+
+  private Map<String, Integer> checkPartitionUsage(ResourcesStateMap assignment,
+      PartitionWeightProvider weightProvider) {
+    Map<String, Integer> weightCount = new HashMap<>();
+    for (String resource : assignment.resourceSet()) {
+      PartitionStateMap partitionMap = assignment.getPartitionStateMap(resource);
+      for (Partition partition : partitionMap.partitionSet()) {
+        // check states
+        Map<String, Integer> stateCount = new HashMap<>(states);
+        Map<String, String> stateMap = partitionMap.getPartitionMap(partition);
+        for (String state : stateMap.values()) {
+          Assert.assertTrue(stateCount.containsKey(state));
+          stateCount.put(state, stateCount.get(state) - 1);
+        }
+        for (int count : stateCount.values()) {
+          Assert.assertEquals(count, 0);
+        }
+
+        // report weight
+        int partitionWeight =
+            weightProvider.getPartitionWeight(resource, partition.getPartitionName());
+        for (String instance : partitionMap.getPartitionMap(partition).keySet()) {
+          if (!weightCount.containsKey(instance)) {
+            weightCount.put(instance, partitionWeight);
+          } else {
+            weightCount.put(instance, weightCount.get(instance) + partitionWeight);
+          }
+        }
+      }
+    }
+    return weightCount;
+  }
+
+  private void validateWeight(PartitionWeightProvider provider) {
+    for (String resource : resourceNames) {
+      for (String partition : partitions) {
+        int weight = provider.getPartitionWeight(resource, partition);
+        if (resource.equals(resourceNames.get(0))) {
+          if (partition.equals(partitions.get(0))) {
+            Assert.assertEquals(weight, resourceWeight * 3);
+          } else {
+            Assert.assertEquals(weight, resourceWeight * 2);
+          }
+        } else if (resource.equals(resourceNames.get(1))) {
+          if (partition.equals(partitions.get(0))) {
+            Assert.assertEquals(weight, resourceWeight * 3);
+          } else {
+            Assert.assertEquals(weight, resourceWeight);
+          }
+        } else {
+          Assert.assertEquals(weight, resourceWeight);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testRebalance() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      capacity.put(instance, defaultCapacity);
+    }
+
+    MockCapacityProvider capacityProvider = new MockCapacityProvider(capacity, defaultCapacity);
+
+    MockPartitionWeightProvider weightProvider = new MockPartitionWeightProvider(resourceWeight);
+
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+    ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null,
+        Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+  }
+
+  @Test
+  public void testZkBasedCapacityProvider() {
+    Map<String, Integer> resourceDefaultWeightMap = new HashMap<>();
+    resourceDefaultWeightMap.put(resourceNames.get(0), resourceWeight * 2);
+    Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
+    partitionWeightMap
+        .put(resourceNames.get(0), Collections.singletonMap(partitions.get(0), resourceWeight * 3));
+    partitionWeightMap
+        .put(resourceNames.get(1), Collections.singletonMap(partitions.get(0), resourceWeight * 3));
+
+    ZkBasedPartitionWeightProvider weightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "Test");
+    weightProvider.updateWeights(resourceDefaultWeightMap, partitionWeightMap, resourceWeight);
+    // verify before persist
+    validateWeight(weightProvider);
+
+    // persist get values back
+    weightProvider.persistWeights();
+    // verify after persist
+    weightProvider = new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "Test");
+    validateWeight(weightProvider);
+
+    weightProvider = new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "Fack");
+    for (String resource : resourceNames) {
+      for (String partition : partitions) {
+        Assert.assertEquals(weightProvider.getPartitionWeight(resource, partition),
+            DEFAULT_WEIGHT_VALUE);
+      }
+    }
+
+    // update with invalid value
+    weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, -1);
+    try {
+      weightProvider.persistWeights();
+      Assert.fail("Should fail to persist invalid weight information.");
+    } catch (HelixException hex) {
+      // expected
+    }
+
+    Map<String, Integer> capacity = new HashMap<>();
+    Map<String, Integer> usage = new HashMap<>();
+    for (int i = 0; i < instanceNames.size(); i++) {
+      capacity.put(instanceNames.get(i), defaultCapacity + i);
+      usage.put(instanceNames.get(i), i);
+    }
+
+    ZkBasedCapacityProvider capacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "Test");
+    capacityProvider.updateCapacity(capacity, usage, defaultCapacity);
+
+    for (String instance : instanceNames) {
+      Assert.assertEquals(capacityProvider.getParticipantCapacity(instance),
+          capacity.get(instance).intValue());
+      Assert.assertEquals(capacityProvider.getParticipantUsage(instance),
+          usage.get(instance).intValue());
+    }
+
+    // persist get values back
+    capacityProvider.persistCapacity();
+    capacityProvider = new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "Test");
+    for (String instance : instanceNames) {
+      Assert.assertEquals(capacityProvider.getParticipantCapacity(instance),
+          capacity.get(instance).intValue());
+      Assert.assertEquals(capacityProvider.getParticipantUsage(instance),
+          usage.get(instance).intValue());
+    }
+
+    // update usage
+    String targetInstanceName = instanceNames.get(0);
+    int newUsgae = 12345;
+    capacityProvider.updateCapacity(Collections.EMPTY_MAP,
+        Collections.singletonMap(targetInstanceName, newUsgae), defaultCapacity);
+    Assert.assertEquals(capacityProvider.getParticipantUsage(targetInstanceName), newUsgae);
+    // check again without updating ZK
+    capacityProvider = new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "Test");
+    Assert.assertEquals(capacityProvider.getParticipantUsage(targetInstanceName), 0);
+
+    // update with invalid value
+    capacityProvider.updateCapacity(Collections.EMPTY_MAP, Collections.EMPTY_MAP, -1);
+    try {
+      capacityProvider.persistCapacity();
+      Assert.fail("Should fail to persist invalid weight information.");
+    } catch (HelixException hex) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testRebalanceUsingZkDataProvider() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      capacity.put(instance, defaultCapacity);
+    }
+
+    ZkBasedPartitionWeightProvider weightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight);
+
+    ZkBasedCapacityProvider capacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    capacityProvider.updateCapacity(capacity, Collections.EMPTY_MAP, 0);
+
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+    ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null,
+        Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+  }
+
+  @Test(dependsOnMethods = "testRebalanceUsingZkDataProvider")
+  public void testRebalanceWithExistingUsage() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    Map<String, Integer> usage = new HashMap<>();
+    for (int i = 0; i < instanceNames.size(); i++) {
+      String instance = instanceNames.get(i);
+      capacity.put(instance, defaultCapacity);
+      if (i % 7 == 0) {
+        usage.put(instance, defaultCapacity);
+      }
+    }
+
+    ZkBasedPartitionWeightProvider weightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight);
+
+    ZkBasedCapacityProvider capacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    capacityProvider.updateCapacity(capacity, usage, 0);
+
+    TotalCapacityConstraint hardConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+    ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null,
+        Collections.<AbstractRebalanceHardConstraint>singletonList(hardConstraint),
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+
+    for (int i = 0; i < instanceNames.size(); i++) {
+      String instance = instanceNames.get(i);
+      if (i % 7 == 0) {
+        Assert.assertTrue(!weightCount.containsKey(instance));
+      } else {
+        Assert.assertTrue(weightCount.get(instance) > 0);
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalanceUsingZkDataProvider")
+  public void testRebalanceOption() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      capacity.put(instance, defaultCapacity);
+    }
+
+    ZkBasedPartitionWeightProvider weightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight);
+
+    ZkBasedCapacityProvider capacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    capacityProvider.updateCapacity(capacity, Collections.EMPTY_MAP, 0);
+
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    // Assume existing assignment
+    ResourcesStateMap existingAssignment = new ResourcesStateMap();
+    String targetResource = resourceNames.get(0);
+    for (String partition : partitions) {
+      for (int i = 0; i < nReplicas; i++) {
+        existingAssignment
+            .setState(targetResource, new Partition(partition), instanceNames.get(i), topState);
+      }
+    }
+
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+
+    // INCREMENTAL
+    ResourcesStateMap assignment =
+        util.buildIncrementalRebalanceAssignment(resourceConfigs, existingAssignment,
+            Collections.EMPTY_LIST,
+            Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    // check if the existingAssignment is changed
+    for (String partition : partitions) {
+      Assert.assertTrue(
+          assignment.getInstanceStateMap(targetResource, new Partition(partition)).keySet()
+              .containsAll(instanceNames.subList(0, nReplicas)));
+    }
+    // still need to check for balance
+    Map<String, Integer> weightCount = checkPartitionUsage(assignment, weightProvider);
+    int max = Collections.max(weightCount.values());
+    int min = Collections.min(weightCount.values());
+    // Since the accuracy of Default evenness constraint is 0.01, diff should be 1/100 of participant capacity in max.
+    Assert.assertTrue((max - min) <= defaultCapacity / 100);
+
+    // FULL
+    assignment = util.buildFullRebalanceAssignment(resourceConfigs, existingAssignment,
+        Collections.EMPTY_LIST,
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    // check if the existingAssignment is changed
+    for (String partition : partitions) {
+      Assert.assertFalse(
+          assignment.getInstanceStateMap(targetResource, new Partition(partition)).keySet()
+              .containsAll(instanceNames.subList(0, nReplicas)));
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalanceUsingZkDataProvider")
+  public void testInvalidInput() {
+    // capacity / weight
+    Map<String, Integer> capacity = new HashMap<>();
+    for (String instance : instanceNames) {
+      capacity.put(instance, defaultCapacity);
+    }
+
+    ZkBasedPartitionWeightProvider weightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    weightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight);
+
+    ZkBasedCapacityProvider capacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDR, CLUSTER_NAME, "QPS");
+    capacityProvider.updateCapacity(capacity, Collections.EMPTY_MAP, 0);
+
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+
+    // Empty constraint
+    try {
+      util.buildIncrementalRebalanceAssignment(resourceConfigs, null, Collections.EMPTY_LIST,
+          Collections.EMPTY_LIST);
+      Assert.fail("Should fail due to empty constraint list.");
+    } catch (HelixException ex) {
+      // expected
+    }
+
+    ResourceConfig.Builder invalidResourceBuilder = new ResourceConfig.Builder("InvalidResource");
+    invalidResourceBuilder.setStateModelDefRef("OnlineOffline");
+    invalidResourceBuilder.setNumPartitions(nPartitions);
+    invalidResourceBuilder.setNumReplica(nReplicas);
+    for (String partition : partitions) {
+      invalidResourceBuilder.setPreferenceList(partition, Collections.EMPTY_LIST);
+    }
+
+    // Auto mode resource config
+    try {
+      invalidResourceBuilder
+          .setRebalanceConfig(new RebalanceConfig(new ZNRecord("InvalidResource")));
+      invalidResourceBuilder.getRebalanceConfig()
+          .setRebalanceMode(RebalanceConfig.RebalanceMode.FULL_AUTO);
+      util.buildIncrementalRebalanceAssignment(
+          Collections.singletonList(invalidResourceBuilder.build()), null,
+          Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+          Collections.EMPTY_LIST);
+      Assert.fail("Should fail due to full auto resource config.");
+    } catch (HelixException ex) {
+      // expected
+      invalidResourceBuilder.getRebalanceConfig()
+          .setRebalanceMode(RebalanceConfig.RebalanceMode.CUSTOMIZED);
+    }
+
+    // Auto mode resource config
+    try {
+      invalidResourceBuilder.setStateModelDefRef("CustomizedOnlineOffline");
+      util.buildIncrementalRebalanceAssignment(
+          Collections.singletonList(invalidResourceBuilder.build()), null,
+          Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+          Collections.EMPTY_LIST);
+      Assert.fail("Should fail due to unknown state model def ref.");
+    } catch (IllegalArgumentException ex) {
+      // expected
+      util.registerCustomizedStateModelDef("CustomizedOnlineOffline", OnlineOfflineSMD.build());
+      ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(
+          Collections.singletonList(invalidResourceBuilder.build()), null,
+          Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+          Collections.EMPTY_LIST);
+      checkPartitionUsage(assignment, weightProvider);
+    }
+  }
+}


Mime
View raw message