helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/2] helix git commit: [HELIX-674] Introducing constraints based rebalancing mechanism.
Date Tue, 24 Apr 2018 00:17:06 GMT
[HELIX-674] Introducing constraints based rebalancing mechanism.

Constraint can be customized by application to restrict how rebalancing is processed.
The change also includes examples to demonstrate how constraints can be defined and used.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3d2d57b0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3d2d57b0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3d2d57b0

Branch: refs/heads/master
Commit: 3d2d57b05af443dcc8658f8b7bfc1fc0d18cd196
Parents: 317c300
Author: jiajunwang <ericwang1985@gmail.com>
Authored: Mon Feb 26 17:21:48 2018 -0800
Committer: jiajunwang <ericwang1985@gmail.com>
Committed: Mon Apr 23 14:18:01 2018 -0700

----------------------------------------------------------------------
 .../AbstractRebalanceHardConstraint.java        |  45 ++
 .../AbstractRebalanceSoftConstraint.java        |  56 +++
 .../dataprovider/CapacityProvider.java          |  48 ++
 .../dataprovider/PartitionWeightProvider.java   |  52 ++
 .../PartitionWeightAwareEvennessConstraint.java |  88 ++++
 .../constraint/TotalCapacityConstraint.java     |  79 +++
 .../dataprovider/ZkBasedCapacityProvider.java   | 202 ++++++++
 .../ZkBasedPartitionWeightProvider.java         | 186 +++++++
 ...stractEvenDistributionRebalanceStrategy.java |  14 +-
 .../strategy/ConstraintRebalanceStrategy.java   | 323 ++++++++++++
 .../CardDealingAdjustmentAlgorithm.java         |  42 +-
 .../util/ResourceUsageCalculator.java           |  36 ++
 .../WeightAwareRebalanceUtilExample.java        | 302 ++++++++++++
 .../helix/util/WeightAwareRebalanceUtil.java    | 216 ++++++++
 .../TestConstraintRebalanceStrategy.java        | 453 +++++++++++++++++
 .../dataprovider/MockCapacityProvider.java      |  52 ++
 .../MockPartitionWeightProvider.java            |  50 ++
 .../TestWeightBasedRebalanceUtil.java           | 488 +++++++++++++++++++
 18 files changed, 2715 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceHardConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceHardConstraint.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceHardConstraint.java
new file mode 100644
index 0000000..7f2559e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceHardConstraint.java
@@ -0,0 +1,45 @@
+package org.apache.helix.api.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.common.ResourcesStateMap;
+
+import java.util.Map;
+
+public abstract class AbstractRebalanceHardConstraint {
+  /**
+   * Return a list of validate results.
+   * @param resource Target resource
+   * @param proposedAssignment Map of <PartitionName, lists of possible ParticipantName>
+   * @return True in the return lists, if the proposed assignment does not valiate the constraint
+   */
+  public abstract Map<String, boolean[]> isValid(String resource,
+      Map<String, String[]> proposedAssignment);
+
+  /**
+   * Update constraint status with the pending assignment.
+   * @param pendingAssignment
+   */
+  public void updateAssignment(ResourcesStateMap pendingAssignment) {
+    // By default, constraint won't need to understand assignment updates.
+    // If the constraint calculation depends on current assignment,
+    // a constraint implementation can choose to override this method and update any internal states.
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceSoftConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceSoftConstraint.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceSoftConstraint.java
new file mode 100644
index 0000000..e06ad68
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbstractRebalanceSoftConstraint.java
@@ -0,0 +1,56 @@
+package org.apache.helix.api.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.common.ResourcesStateMap;
+
+import java.util.Map;
+
+public abstract class AbstractRebalanceSoftConstraint {
+  private static int DEFAULT_WEIGHT = 1;
+  protected int _weight = DEFAULT_WEIGHT;
+
+  /**
+   * Evaluate how the given assignment fits the constraint.
+   * @param resource Target resource
+   * @param proposedAssignment Map of <PartitionName, lists of possible ParticipantName>
+   * @return Score about the assignment according to this constraint. Larger number means better fit under this constraint.
+   */
+  public abstract Map<String, int[]> evaluate(String resource,
+      Map<String, String[]> proposedAssignment);
+
+  /**
+   * @return The soft constraint's weight that will be used to consolidate the final evaluation score.
+   * Aggregated evaluation score = SUM(constraint_evaluation * weight).
+   */
+  public int getConstraintWeight() {
+    return _weight;
+  }
+
+  /**
+   * Update constraint status with the pending assignment.
+   * @param pendingAssignment
+   */
+  public void updateAssignment(ResourcesStateMap pendingAssignment) {
+    // By default, constraint won't need to understand assignment updates.
+    // If the constraint calculation depends on current assignment,
+    // a constraint implementation can choose to override this method and update any internal states.
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/CapacityProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/CapacityProvider.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/CapacityProvider.java
new file mode 100644
index 0000000..5f48793
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/CapacityProvider.java
@@ -0,0 +1,48 @@
+package org.apache.helix.api.rebalancer.constraint.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An interface for getting participant capacity information.
+ * The return value will be used to estimate available capacity, as well as utilization in percentage for prioritizing participants.
+ *
+ * Note that all return values of the provider are supposed to be in the same unit.
+ * For example, if the provider is for memory capacity of a host (total memory size is 8G, current usage is 512MB),
+ * the return values could be {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider#getParticipantCapacity(String)} = 8192, and {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider#getParticipantUsage(String)} = 512.
+ * Another example, if the provider is for partition count capacity (max count 1000 partitions, currently no partition assigned),
+ * the return values should be {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider#getParticipantCapacity(String)} = 1000, and {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider#getParticipantUsage(String)} = 0.
+ *
+ * Moreover, while this provider is used together with a {@link org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider PartitionWeightProvider},
+ * both providers are supposed to return values in the same unit so they can be used to estimate the resource usage of an proposed assignment.
+ */
+public interface CapacityProvider {
+
+  /**
+   * @param participant
+   * @return The total participant capacity.
+   */
+  int getParticipantCapacity(String participant);
+
+  /**
+   * @param participant
+   * @return The participant usage.
+   */
+  int getParticipantUsage(String participant);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/PartitionWeightProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/PartitionWeightProvider.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/PartitionWeightProvider.java
new file mode 100644
index 0000000..e4afe7a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/dataprovider/PartitionWeightProvider.java
@@ -0,0 +1,52 @@
+package org.apache.helix.api.rebalancer.constraint.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An interface for getting partition weight information.
+ * This provider is supposed to return an estimation of max resource usage if a partition assigned to a participant.
+ *
+ * The return value will be used to calculate resource requirement of a proposed assignment to estimate if the assignment is possible or not.
+ * In detail,
+ * NewResourceUsage = {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider#getParticipantUsage(String)} + SUM({@link org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider#getPartitionWeight(String, String)})
+ * And for validate, the constraint checks if {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider#getParticipantCapacity(String) TotalCapacity} >= NewResourceUsage.
+ *
+ * For example, a database resource partition will need certain amount of storage usage, as well as CPU, memory, and network IO.
+ * If a provider is implemented for estimating storage usage, for instance,
+ * {@link org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider#getPartitionWeight(String, String)} should return
+ * disk size that need to be reserved by this partition. Returns 10 for 10 GB etc.
+ * In addition, if another provider is implemented for estimating memory usage, {@link org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider#getPartitionWeight(String, String)}
+ * should return max memory that could be used by this partition. Returns 100 for 100 MB memory usage etc.
+ *
+ * While this provider is used together with a {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider CapacityProvider},
+ * both providers are supposed to return values in the same unit so they can be used to estimate the resource usage of an proposed assignment.
+ * For example, if the providers are designed for providing memory usage, and the {@link org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider CapacityProvider}
+ * returns value in the unit of MB, then {@link org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider#getPartitionWeight(String, String)} should also return
+ * the weight in the unit of MB.
+ */
+public interface PartitionWeightProvider {
+
+  /**
+   * @param resource
+   * @param partition
+   * @return The weight (of participant capacity) that is required by a certain partition in the specified resource
+   */
+  int getPartitionWeight(String resource, String partition);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/PartitionWeightAwareEvennessConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/PartitionWeightAwareEvennessConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/PartitionWeightAwareEvennessConstraint.java
new file mode 100644
index 0000000..af5e462
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/PartitionWeightAwareEvennessConstraint.java
@@ -0,0 +1,88 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PartitionWeightAwareEvennessConstraint extends AbstractRebalanceSoftConstraint {
+  private final PartitionWeightProvider _partitionWeightProvider;
+  private final CapacityProvider _capacityProvider;
+  // Use to track any assignments that are proposed during the rebalance process.
+  // Note these assignments are not reflected in providers.
+  private final Map<String, Integer> _pendingUsage;
+
+  public PartitionWeightAwareEvennessConstraint(PartitionWeightProvider partitionWeightProvider,
+      CapacityProvider capacityProvider) {
+    super();
+    _partitionWeightProvider = partitionWeightProvider;
+    _capacityProvider = capacityProvider;
+    _pendingUsage = new HashMap<>();
+  }
+
+  /**
+   * @return Remaining capacity in percentage.
+   * Accuracy of evenness using this constraint is 1/100.
+   */
+  private int evaluate(String resource, String partition, String participant) {
+    double capacity = _capacityProvider.getParticipantCapacity(participant);
+    if (capacity == 0) {
+      return 0;
+    }
+    double usage = _capacityProvider.getParticipantUsage(participant) + (_pendingUsage
+        .containsKey(participant) ? _pendingUsage.get(participant) : 0);
+    double available =
+        capacity - usage - _partitionWeightProvider.getPartitionWeight(resource, partition);
+    return (int) Math.ceil((available / capacity) * 100);
+  }
+
+  @Override
+  public Map<String, int[]> evaluate(String resource, Map<String, String[]> proposedAssignment) {
+    Map<String, int[]> result = new HashMap<>();
+    for (String partition : proposedAssignment.keySet()) {
+      String[] participants = proposedAssignment.get(partition);
+      int[] evaluateResults = new int[participants.length];
+      for (int i = 0; i < participants.length; i++) {
+        evaluateResults[i] = evaluate(resource, partition, participants[i]);
+      }
+      result.put(partition, evaluateResults);
+    }
+    return result;
+  }
+
+  @Override
+  public void updateAssignment(ResourcesStateMap pendingAssignment) {
+    Map<String, Integer> newParticipantUsage =
+        ResourceUsageCalculator.getResourceUsage(pendingAssignment, _partitionWeightProvider);
+    for (String participant : newParticipantUsage.keySet()) {
+      if (!_pendingUsage.containsKey(participant)) {
+        _pendingUsage.put(participant, 0);
+      }
+      _pendingUsage
+          .put(participant, _pendingUsage.get(participant) + newParticipantUsage.get(participant));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/TotalCapacityConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/TotalCapacityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/TotalCapacityConstraint.java
new file mode 100644
index 0000000..f76c555
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/TotalCapacityConstraint.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TotalCapacityConstraint extends AbstractRebalanceHardConstraint {
+  private final PartitionWeightProvider _partitionWeightProvider;
+  private final CapacityProvider _capacityProvider;
+  // Use to track any assignments that are proposed during the rebalance process.
+  // Note these assignments are not reflected in providers.
+  private final Map<String, Integer> _pendingUsage;
+
+  public TotalCapacityConstraint(PartitionWeightProvider partitionWeightProvider,
+      CapacityProvider capacityProvider) {
+    super();
+    _partitionWeightProvider = partitionWeightProvider;
+    _capacityProvider = capacityProvider;
+    _pendingUsage = new HashMap<>();
+  }
+
+  private boolean validate(String resource, String partition, String participant) {
+    int usage = _capacityProvider.getParticipantUsage(participant) + (_pendingUsage
+        .containsKey(participant) ? _pendingUsage.get(participant) : 0);
+    return _partitionWeightProvider.getPartitionWeight(resource, partition) + usage
+        <= _capacityProvider.getParticipantCapacity(participant);
+  }
+
+  @Override
+  public Map<String, boolean[]> isValid(String resource, Map<String, String[]> proposedAssignment) {
+    Map<String, boolean[]> result = new HashMap<>();
+    for (String partition : proposedAssignment.keySet()) {
+      String[] participants = proposedAssignment.get(partition);
+      boolean[] validateResults = new boolean[participants.length];
+      for (int i = 0; i < participants.length; i++) {
+        validateResults[i] = validate(resource, partition, participants[i]);
+      }
+      result.put(partition, validateResults);
+    }
+    return result;
+  }
+
+  @Override
+  public void updateAssignment(ResourcesStateMap pendingAssignment) {
+    Map<String, Integer> newParticipantUsage =
+        ResourceUsageCalculator.getResourceUsage(pendingAssignment, _partitionWeightProvider);
+    for (String participant : newParticipantUsage.keySet()) {
+      if (!_pendingUsage.containsKey(participant)) {
+        _pendingUsage.put(participant, 0);
+      }
+      _pendingUsage
+          .put(participant, _pendingUsage.get(participant) + newParticipantUsage.get(participant));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedCapacityProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedCapacityProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedCapacityProvider.java
new file mode 100644
index 0000000..0172b24
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedCapacityProvider.java
@@ -0,0 +1,202 @@
+package org.apache.helix.controller.rebalancer.constraint.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.*;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A capacity provider based on ZK node.
+ * This class support persistent through Helix Property Store.
+ */
+public class ZkBasedCapacityProvider implements CapacityProvider {
+  public static final int DEFAULT_CAPACITY_VALUE = 0;
+  private static final String ROOT = "/PARTICIPANT_CAPACITY";
+
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final String _dimensionPath;
+  private ParticipantCapacity _capacity;
+
+  /**
+   * @param propertyStore The store that will be used to persist capacity information.
+   * @param dimensionName Identify of the capacity attribute. For example memory, CPU.
+   */
+  public ZkBasedCapacityProvider(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String dimensionName) {
+    _propertyStore = propertyStore;
+    _dimensionPath = ROOT + "/" + dimensionName;
+
+    ZNRecord existingRecord = _propertyStore.get(_dimensionPath, null, AccessOption.PERSISTENT);
+    if (existingRecord == null) {
+      // Create a capacity object using default capacity (DEFAULT_CAPACITY_VALUE).
+      _capacity = new ParticipantCapacity(dimensionName);
+    } else {
+      _capacity = new ParticipantCapacity(existingRecord);
+    }
+  }
+
+  /**
+   * @param zkAddr
+   * @param clusterName
+   * @param dimensionName Identify of the capacity attribute. For example memory, CPU.
+   *                      Need to match resource weight dimension.
+   */
+  public ZkBasedCapacityProvider(String zkAddr, String clusterName, String dimensionName) {
+    this(new ZkHelixPropertyStore<ZNRecord>(zkAddr, new ZNRecordSerializer(),
+        PropertyPathBuilder.propertyStore(clusterName)), dimensionName);
+  }
+
+  /**
+   * Update capacity information.
+   *
+   * @param capacityMap     <ParticipantName, Total Participant Capacity>
+   * @param usageMap        <ParticipantName, Usage>
+   * @param defaultCapacity Default total capacity if not specified in the map
+   */
+  public void updateCapacity(Map<String, Integer> capacityMap, Map<String, Integer> usageMap,
+      int defaultCapacity) {
+    for (String participant : capacityMap.keySet()) {
+      _capacity.setCapacity(participant, capacityMap.get(participant));
+    }
+    for (String participant : usageMap.keySet()) {
+      _capacity.setUsage(participant, usageMap.get(participant));
+    }
+    _capacity.setDefaultCapacity(defaultCapacity);
+  }
+
+  /**
+   * @return True if the capacity information is successfully wrote to ZK.
+   */
+  public boolean persistCapacity() {
+    if (_capacity.isValid()) {
+      return _propertyStore.set(_dimensionPath, _capacity.getRecord(), AccessOption.PERSISTENT);
+    } else {
+      throw new HelixException("Invalid ParticipantCapacity: " + _capacity.getRecord().toString());
+    }
+  }
+
+  @Override
+  public int getParticipantCapacity(String participant) {
+    return _capacity.getCapacity(participant);
+  }
+
+  @Override
+  public int getParticipantUsage(String participant) {
+    return _capacity.getUsage(participant);
+  }
+
+  /**
+   * Data model for participant capacity.
+   * Per-participant capacity and usage are recorded in the mapfields.
+   */
+  private static class ParticipantCapacity extends HelixProperty {
+    private static final String CAPACITY = "CAPACITY";
+    private static final String USAGE = "USAGE_SIZE";
+
+    enum ParticipantCapacityProperty {
+      DEFAULT_CAPACITY,
+    }
+
+    ParticipantCapacity(String dimensionName) {
+      super(dimensionName);
+      _record
+          .setIntField(ParticipantCapacityProperty.DEFAULT_CAPACITY.name(), DEFAULT_CAPACITY_VALUE);
+    }
+
+    ParticipantCapacity(ZNRecord record) {
+      super(record);
+      if (!isValid()) {
+        throw new HelixException("Invalid ParticipantCapacity: " + record.toString());
+      }
+    }
+
+    int getCapacity(String participant) {
+      Map<String, String> participantMap = _record.getMapField(participant);
+      if (participantMap != null && participantMap.containsKey(CAPACITY)) {
+        return Integer.parseInt(participantMap.get(CAPACITY));
+      }
+      return getDefaultCapacity();
+    }
+
+    int getUsage(String participant) {
+      Map<String, String> participantMap = _record.getMapField(participant);
+      if (participantMap != null && participantMap.containsKey(USAGE)) {
+        return Integer.parseInt(participantMap.get(USAGE));
+      }
+      return 0;
+    }
+
+    void setCapacity(String participant, int capacity) {
+      Map<String, String> participantMap = getOrAddParticipantMap(participant);
+      participantMap.put(CAPACITY, new Integer(capacity).toString());
+    }
+
+    void setUsage(String participant, int usage) {
+      Map<String, String> participantMap = getOrAddParticipantMap(participant);
+      participantMap.put(USAGE, new Integer(usage).toString());
+    }
+
+    private Map<String, String> getOrAddParticipantMap(String participant) {
+      Map<String, String> participantMap = _record.getMapField(participant);
+      if (participantMap == null) {
+        participantMap = new HashMap<>();
+        _record.setMapField(participant, participantMap);
+      }
+      return participantMap;
+    }
+
+    void setDefaultCapacity(int defaultCapacity) {
+      _record.setIntField(ParticipantCapacityProperty.DEFAULT_CAPACITY.name(), defaultCapacity);
+    }
+
+    private int getDefaultCapacity() {
+      return _record
+          .getIntField(ParticipantCapacityProperty.DEFAULT_CAPACITY.name(), DEFAULT_CAPACITY_VALUE);
+    }
+
+    @Override
+    public boolean isValid() {
+      try {
+        // check default capacity
+        int defaultCapacity = getDefaultCapacity();
+        if (defaultCapacity < 0) {
+          return false;
+        }
+        // check if any invalid capacity values
+        for (Map<String, String> capacityRecords : _record.getMapFields().values()) {
+          if ((capacityRecords.containsKey(CAPACITY)
+              && Integer.parseInt(capacityRecords.get(CAPACITY)) < 0) || (
+              capacityRecords.containsKey(USAGE)
+                  && Integer.parseInt(capacityRecords.get(USAGE)) < 0)) {
+            return false;
+          }
+        }
+        return true;
+      } catch (Exception ex) {
+        return false;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedPartitionWeightProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedPartitionWeightProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedPartitionWeightProvider.java
new file mode 100644
index 0000000..8cd46b3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/dataprovider/ZkBasedPartitionWeightProvider.java
@@ -0,0 +1,186 @@
+package org.apache.helix.controller.rebalancer.constraint.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.*;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A resource weight provider based on ZK node.
+ * This class support persistent through Helix Property Store.
+ */
+public class ZkBasedPartitionWeightProvider implements PartitionWeightProvider {
+  public static final int DEFAULT_WEIGHT_VALUE = 1;
+  private static final String ROOT = "/RESOURCE_WEIGHT";
+
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final String _dimensionPath;
+  private PartitionWeight _weights;
+
+  /**
+   * @param propertyStore The store that will be used to persist capacity information.
+   * @param dimensionName Identify of the capacity attribute. For example memory, CPU.
+   */
+  public ZkBasedPartitionWeightProvider(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String dimensionName) {
+    _propertyStore = propertyStore;
+    _dimensionPath = ROOT + "/" + dimensionName;
+
+    ZNRecord existingRecord = _propertyStore.get(_dimensionPath, null, AccessOption.PERSISTENT);
+    if (existingRecord == null) {
+      // Create default weight that return default weight only (DEFAULT_WEIGHT_VALUE).
+      _weights = new PartitionWeight(dimensionName);
+    } else {
+      _weights = new PartitionWeight(existingRecord);
+    }
+  }
+
+  /**
+   * @param zkAddr
+   * @param clusterName
+   * @param dimensionName Identify of the capacity attribute. For example memory, CPU.
+   *                      Need to match resource weight dimension.
+   */
+  public ZkBasedPartitionWeightProvider(String zkAddr, String clusterName, String dimensionName) {
+    this(new ZkHelixPropertyStore<ZNRecord>(zkAddr, new ZNRecordSerializer(),
+        PropertyPathBuilder.propertyStore(clusterName)), dimensionName);
+  }
+
+  /**
+   * Update resources weight information.
+   * @param resourceDefaultWeightMap <ResourceName, Resource Level Default Partition Weight>
+   * @param partitionWeightMap <ResourceName, <PartitionName, Partition Weight>>
+   * @param defaultWeight Overall Default partition Weight
+   */
+  public void updateWeights(Map<String, Integer> resourceDefaultWeightMap,
+      Map<String, Map<String, Integer>> partitionWeightMap, int defaultWeight) {
+    for (String resource : resourceDefaultWeightMap.keySet()) {
+      _weights.setResourceDefaultWeight(resource, resourceDefaultWeightMap.get(resource));
+    }
+    for (String resource : partitionWeightMap.keySet()) {
+      Map<String, Integer> detailMap = partitionWeightMap.get(resource);
+      for (String partition : detailMap.keySet()) {
+        _weights.setPartitionWeight(resource, partition, detailMap.get(partition));
+      }
+    }
+    _weights.setDefaultWeight(defaultWeight);
+  }
+
+  /**
+   * @return True if the weight information is successfully wrote to ZK.
+   */
+  public boolean persistWeights() {
+    if (_weights.isValid()) {
+      return _propertyStore.set(_dimensionPath, _weights.getRecord(), AccessOption.PERSISTENT);
+    } else {
+      throw new HelixException("Invalid ParticipantCapacity: " + _weights.getRecord().toString());
+    }
+  }
+
+  @Override
+  public int getPartitionWeight(String resource, String partition) {
+    return _weights.getWeight(resource, partition);
+  }
+
+  /**
+   * Data model for partition weight.
+   * Default resource weight and overall default weight are stored in the simplefields.
+   * Pre-partition weights are stored in the mapfields.
+   */
+  private static class PartitionWeight extends HelixProperty {
+    enum ResourceWeightProperty {
+      DEFAULT_WEIGHT,
+      DEFAULT_RESOURCE_WEIGHT
+    }
+
+    PartitionWeight(String dimensionName) {
+      super(dimensionName);
+      _record.setIntField(ResourceWeightProperty.DEFAULT_WEIGHT.name(), DEFAULT_WEIGHT_VALUE);
+    }
+
+    PartitionWeight(ZNRecord record) {
+      super(record);
+      if (!isValid()) {
+        throw new HelixException("Invalid ResourceWeight: " + record.toString());
+      }
+    }
+
+    private String getWeightKey(String resource) {
+      return ResourceWeightProperty.DEFAULT_RESOURCE_WEIGHT.name() + "_" + resource;
+    }
+
+    int getWeight(String resource, String partition) {
+      Map<String, String> partitionWeightMap = _record.getMapField(resource);
+      if (partitionWeightMap != null && partitionWeightMap.containsKey(partition)) {
+        return Integer.parseInt(partitionWeightMap.get(partition));
+      }
+      return _record.getIntField(getWeightKey(resource), getDefaultWeight());
+    }
+
+    void setResourceDefaultWeight(String resource, int weight) {
+      _record.setIntField(getWeightKey(resource), weight);
+    }
+
+    void setPartitionWeight(String resource, String partition, int weight) {
+      Map<String, String> partitionWeightMap = _record.getMapField(resource);
+      if (partitionWeightMap == null) {
+        partitionWeightMap = new HashMap<>();
+        _record.setMapField(resource, partitionWeightMap);
+      }
+      partitionWeightMap.put(partition, new Integer(weight).toString());
+    }
+
+    void setDefaultWeight(int defaultCapacity) {
+      _record.setIntField(ResourceWeightProperty.DEFAULT_WEIGHT.name(), defaultCapacity);
+    }
+
+    private int getDefaultWeight() {
+      return _record.getIntField(ResourceWeightProperty.DEFAULT_WEIGHT.name(), DEFAULT_WEIGHT_VALUE);
+    }
+
+    @Override
+    public boolean isValid() {
+      try {
+        // check all default weights
+        for (String weightStr : _record.getSimpleFields().values()) {
+          if (Integer.parseInt(weightStr) < 0) {
+            return false;
+          }
+        }
+        // check all partition weights
+        for (Map<String, String> partitionWeights : _record.getMapFields().values()) {
+          for (String weightStr : partitionWeights.values()) {
+            if (Integer.parseInt(weightStr) < 0) {
+              return false;
+            }
+          }
+        }
+        return true;
+      } catch (Exception ex) {
+        return false;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
index c3093b1..a7c870b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -39,11 +39,17 @@ import java.util.concurrent.ExecutionException;
 public abstract class AbstractEvenDistributionRebalanceStrategy implements RebalanceStrategy {
   private static final Logger _logger =
       LoggerFactory.getLogger(AbstractEvenDistributionRebalanceStrategy.class);
-  private String _resourceName;
-  private int _replica;
+  protected String _resourceName;
+  protected int _replica;
 
   protected abstract RebalanceStrategy getBaseRebalanceStrategy();
 
+  protected CardDealingAdjustmentAlgorithm getCardDealingAlgorithm(Topology topology) {
+    // by default, minimize the movement when calculating for evenness.
+    return new CardDealingAdjustmentAlgorithm(topology, _replica,
+        CardDealingAdjustmentAlgorithm.Mode.MINIMIZE_MOVEMENT);
+  }
+
   @Override
   public void init(String resourceName, final List<String> partitions,
       final LinkedHashMap<String, Integer> states, int maximumPerNode) {
@@ -82,8 +88,8 @@ public abstract class AbstractEvenDistributionRebalanceStrategy implements Rebal
       // Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution.
       Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(),
           clusterData.getClusterConfig());
-      CardDealingAdjustmentAlgorithm cardDealer =
-          new CardDealingAdjustmentAlgorithm(allNodeTopo, _replica);
+      CardDealingAdjustmentAlgorithm cardDealer = getCardDealingAlgorithm(allNodeTopo);
+
       if (cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
         // Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform.
         finalPartitionMap = shufflePreferenceList(nodeToPartitionMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
new file mode 100644
index 0000000..fe3c89d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
@@ -0,0 +1,323 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Constraints based rebalance strategy.
+ * Assignment is calculated according to the specified constraints.
+ */
+public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalanceStrategy {
+  private static final Logger _logger = LoggerFactory.getLogger(ConstraintRebalanceStrategy.class);
+  // For the instances that are restricted by soft constraints, the minimum weight for assigning partitions.
+  private static final int MIN_INSTANCE_WEIGHT = 1;
+
+  // CRUSH ensures deterministic and evenness
+  private final RebalanceStrategy _baseStrategy = new CrushRebalanceStrategy();
+
+  protected RebalanceStrategy getBaseRebalanceStrategy() {
+    return _baseStrategy;
+  }
+
+  private final List<AbstractRebalanceHardConstraint> _hardConstraints = new ArrayList<>();
+  private final List<AbstractRebalanceSoftConstraint> _softConstraints = new ArrayList<>();
+
+  private List<String> _partitions;
+  private int _maxPerNode;
+
+  // resource replica state requirement
+  private LinkedHashMap<String, Integer> _states;
+  // extend state requirement to a ordered list
+  private List<String> _orderedStateList;
+
+  public ConstraintRebalanceStrategy(
+      List<? extends AbstractRebalanceHardConstraint> hardConstraints,
+      List<? extends AbstractRebalanceSoftConstraint> softConstraints) {
+    if (hardConstraints != null) {
+      _hardConstraints.addAll(hardConstraints);
+    }
+    if (softConstraints != null) {
+      _softConstraints.addAll(softConstraints);
+    }
+    if (_hardConstraints.isEmpty() && _softConstraints.isEmpty()) {
+      throw new HelixException(
+          "Failed to construct ConstraintRebalanceStrategy since no constraint is provided.");
+    }
+  }
+
+  /**
+   * This strategy is currently for rebalance tool only.
+   * For the constructor defined for AutoRebalancer, use a simplified default constraint to ensure balance.
+   *
+   * Note this strategy will flip-flop almost for sure if directly used in the existing rebalancer.
+   * TODO Enable different constraints for automatic rebalance process in the controller later.
+   */
+  public ConstraintRebalanceStrategy() {
+    _logger.debug("Init constraint rebalance strategy using the default even constraint.");
+    PartitionWeightAwareEvennessConstraint defaultConstraint =
+        new PartitionWeightAwareEvennessConstraint(new PartitionWeightProvider() {
+          @Override
+          public int getPartitionWeight(String resource, String partition) {
+            return 1;
+          }
+        }, new CapacityProvider() {
+          @Override
+          public int getParticipantCapacity(String participant) {
+            return MIN_INSTANCE_WEIGHT;
+          }
+
+          @Override
+          public int getParticipantUsage(String participant) {
+            return 0;
+          }
+        });
+    _softConstraints.add(defaultConstraint);
+  }
+
+  protected CardDealingAdjustmentAlgorithm getCardDealingAlgorithm(Topology topology) {
+    // For constraint based strategy, need more fine-grained assignment for each partition.
+    // So evenness is more important.
+    return new CardDealingAdjustmentAlgorithm(topology, _replica,
+        CardDealingAdjustmentAlgorithm.Mode.EVENNESS);
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = new ArrayList<>(partitions);
+    _maxPerNode = maximumPerNode;
+    _states = states;
+    _orderedStateList = new ArrayList<>();
+    for (String state : states.keySet()) {
+      for (int i = 0; i < states.get(state); i++) {
+        _orderedStateList.add(state);
+      }
+    }
+  }
+
+  /**
+   * Generate assignment based on the constraints.
+   *
+   * @param allNodes         All instances
+   * @param liveNodes        List of live instances
+   * @param currentMapping   current replica mapping. Will directly use this mapping if it meets state model requirement
+   * @param clusterData      cluster data
+   * @return IdeaState node that contains both preference list and a proposed state mapping.
+   * @throws HelixException
+   */
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
+      ClusterDataCache clusterData) throws HelixException {
+    // Since instance weight will be replaced by constraint evaluation, record it in advance to avoid
+    // overwriting.
+    Map<String, Integer> instanceWeightRecords = new HashMap<>();
+    for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) {
+      if (instanceConfig.getWeight() != InstanceConfig.WEIGHT_NOT_SET) {
+        instanceWeightRecords.put(instanceConfig.getInstanceName(), instanceConfig.getWeight());
+      }
+    }
+
+    List<String> candidates = new ArrayList<>(allNodes);
+    // Only calculate for configured nodes.
+    // Remove all non-configured nodes.
+    candidates.retainAll(clusterData.getInstanceConfigMap().keySet());
+
+    // For generating the IdealState ZNRecord
+    Map<String, List<String>> preferenceList = new HashMap<>();
+    Map<String, Map<String, String>> idealStateMap = new HashMap<>();
+
+    for (String partition : _partitions) {
+      if (currentMapping.containsKey(partition)) {
+        Map<String, String> partitionMapping = currentMapping.get(partition);
+        // check for the preferred assignment
+        partitionMapping = validateStateMap(partitionMapping);
+        if (partitionMapping != null) {
+          _logger.debug(
+              "The provided preferred partition assignment meets state model requirements. Skip rebalance.");
+          preferenceList.put(partition, new ArrayList<>(partitionMapping.keySet()));
+          idealStateMap.put(partition, partitionMapping);
+          updateConstraints(partition, partitionMapping);
+          continue;
+        }
+      } // else, recalculate the assignment
+      List<String> assignment =
+          computeSinglePartitionAssignment(partition, candidates, liveNodes, clusterData);
+
+      // Shuffle the list.
+      // Note that single partition assignment won't have enough inputs to ensure state evenness.
+      // So need to shuffle again, here
+      Collections.shuffle(assignment,
+          new Random(String.format("%s.%s", _resourceName, partition).hashCode()));
+
+      // Calculate for replica states
+      Map<String, String> stateMap = new HashMap<>();
+      for (int i = 0; i < assignment.size(); i++) {
+        stateMap.put(assignment.get(i), _orderedStateList.get(i));
+      }
+
+      // Update idea states & preference list
+      idealStateMap.put(partition, stateMap);
+      preferenceList.put(partition, assignment);
+      // Note, only update with the new pending assignment
+      updateConstraints(partition, stateMap);
+    }
+
+    // recover the original weight
+    for (String instanceName : instanceWeightRecords.keySet()) {
+      clusterData.getInstanceConfigMap().get(instanceName)
+          .setWeight(instanceWeightRecords.get(instanceName));
+    }
+
+    ZNRecord result = new ZNRecord(_resourceName);
+    result.setListFields(preferenceList);
+    result.setMapFields(idealStateMap);
+    return result;
+  }
+
+  /**
+   * @param actualMapping
+   * @return a filtered state mapping that fit state model definition.
+   *          Or null if the input mapping is conflict with state model.
+   */
+  private Map<String, String> validateStateMap(Map<String, String> actualMapping) {
+    Map<String, String> filteredStateMapping = new HashMap<>();
+
+    Map<String, Integer> tmpStates = new HashMap<>(_states);
+    for (String partition : actualMapping.keySet()) {
+      String state = actualMapping.get(partition);
+      if (tmpStates.containsKey(state)) {
+        int count = tmpStates.get(state);
+        if (count > 0) {
+          filteredStateMapping.put(partition, state);
+          tmpStates.put(state, count - 1);
+        }
+      }
+    }
+
+    for (String state : tmpStates.keySet()) {
+      if (tmpStates.get(state) > 0) {
+        return null;
+      }
+    }
+    return filteredStateMapping;
+  }
+
+  /**
+   * Calculate for a fine-grained assignment for all replicas of a single partition.
+   *
+   * @param partitionName
+   * @param allNodes
+   * @param liveNodes
+   * @param clusterData
+   * @return
+   */
+  private List<String> computeSinglePartitionAssignment(String partitionName,
+      final List<String> allNodes, final List<String> liveNodes, ClusterDataCache clusterData) {
+    List<String> qualifiedNodes = new ArrayList<>(allNodes);
+
+    // do hard constraints check and find all qualified instances
+    for (AbstractRebalanceHardConstraint hardConstraint : _hardConstraints) {
+      Map<String, String[]> proposedAssignment = Collections
+          .singletonMap(partitionName, qualifiedNodes.toArray(new String[qualifiedNodes.size()]));
+      boolean[] validateResults =
+          hardConstraint.isValid(_resourceName, proposedAssignment).get(partitionName);
+      for (int i = 0; i < validateResults.length; i++) {
+        if (!validateResults[i]) {
+          qualifiedNodes.remove(proposedAssignment.get(partitionName)[i]);
+        }
+      }
+    }
+
+    int[] instancePriority = new int[qualifiedNodes.size()];
+    Map<String, String[]> proposedAssignment = Collections
+        .singletonMap(partitionName, qualifiedNodes.toArray(new String[qualifiedNodes.size()]));
+    for (AbstractRebalanceSoftConstraint softConstraint : _softConstraints) {
+      if (softConstraint.getConstraintWeight() == 0) {
+        continue;
+      }
+
+      int[] evaluateResults =
+          softConstraint.evaluate(_resourceName, proposedAssignment).get(partitionName);
+      for (int i = 0; i < evaluateResults.length; i++) {
+        // accumulate all evaluate results
+        instancePriority[i] += evaluateResults[i] * softConstraint.getConstraintWeight();
+      }
+    }
+
+    // Since the evaluated result can be a negative number, get the min result as the baseline for normalizing all priorities to set weight.
+    int baseline = Integer.MAX_VALUE;
+    for (int priority : instancePriority) {
+      if (baseline > priority) {
+        baseline = priority;
+      }
+    }
+    // Limit the weight to be at least MIN_INSTANCE_WEIGHT
+    for (int i = 0; i < instancePriority.length; i++) {
+      clusterData.getInstanceConfigMap().get(qualifiedNodes.get(i))
+          .setWeight(instancePriority[i] - baseline + MIN_INSTANCE_WEIGHT);
+    }
+
+    // Trigger rebalance only for a single partition.
+    // Note that if we do it for the whole resource,
+    // the result won't be accurate since the pending assignment won't be updated to constraints.
+    super.init(_resourceName, Collections.singletonList(partitionName), _states, _maxPerNode);
+    ZNRecord partitionAssignment = super
+        .computePartitionAssignment(qualifiedNodes, liveNodes, Collections.EMPTY_MAP, clusterData);
+
+    return partitionAssignment.getListFields().get(partitionName);
+  }
+
+  private void updateConstraints(String partition, Map<String, String> pendingAssignment) {
+    if (pendingAssignment.isEmpty()) {
+      _logger.warn("No pending assignment needs to update. Skip constraint update.");
+      return;
+    }
+
+    ResourcesStateMap tempStateMap = new ResourcesStateMap();
+    tempStateMap.setState(_resourceName, new Partition(partition), pendingAssignment);
+    _logger.debug("Update constraints with pending assignment: " + tempStateMap.toString());
+
+    for (AbstractRebalanceHardConstraint hardConstraint : _hardConstraints) {
+      hardConstraint.updateAssignment(tempStateMap);
+    }
+    for (AbstractRebalanceSoftConstraint softConstraint : _softConstraints) {
+      softConstraint.updateAssignment(tempStateMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
index 4470094..ec3adcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
@@ -8,6 +8,12 @@ import java.util.*;
 public class CardDealingAdjustmentAlgorithm {
   private static int MAX_ADJUSTMENT = 2;
 
+  public enum Mode {
+    MINIMIZE_MOVEMENT,
+    EVENNESS
+  }
+
+  private Mode _mode;
   private int _replica;
   // Instance -> FaultZone Tag
   private Map<String, String> _instanceFaultZone = new HashMap<>();
@@ -17,7 +23,8 @@ public class CardDealingAdjustmentAlgorithm {
   // Record existing partitions that are assigned to a fault zone
   private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>();
 
-  public CardDealingAdjustmentAlgorithm(Topology topology, int replica) {
+  public CardDealingAdjustmentAlgorithm(Topology topology, int replica, Mode mode) {
+    _mode = mode;
     _replica = replica;
     // Get all instance related information.
     for (Node zone : topology.getFaultZones()) {
@@ -64,15 +71,19 @@ public class CardDealingAdjustmentAlgorithm {
       targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions);
     }
 
-    // Calculate the expected spikes
-    // Assign spikes to each zone according to zone weight
-    int totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
+    int totalOverflows = 0;
     Map<String, Integer> maxZoneOverflows = new HashMap<>();
-    for (String faultZoneName : _faultZoneWeight.keySet()) {
-      float zoneWeight = _faultZoneWeight.get(faultZoneName);
-      maxZoneOverflows.put(faultZoneName,
-          (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
+    if (_mode.equals(Mode.MINIMIZE_MOVEMENT)) {
+      // Calculate the expected spikes
+      // Assign spikes to each zone according to zone weight
+      totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
+      for (String faultZoneName : _faultZoneWeight.keySet()) {
+        float zoneWeight = _faultZoneWeight.get(faultZoneName);
+        maxZoneOverflows.put(faultZoneName,
+            (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
+      }
     }
+    // Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode
 
     Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator();
     while (nodeIter.hasNext()) {
@@ -97,8 +108,8 @@ public class CardDealingAdjustmentAlgorithm {
       List<String> partitions = nodeToPartitionMap.get(instance);
       int target = (int) (Math.floor(targetPartitionCount.get(instance)));
       if (partitions.size() > target) {
-        int maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
-        if (maxZoneOverflow > 0 && totalOverflows > 0) {
+        Integer maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
+        if (maxZoneOverflow != null && maxZoneOverflow > 0 && totalOverflows > 0) {
           // When fault zone has overflow capacity AND there are still remaining overflow partitions
           target = (int) (Math.ceil(targetPartitionCount.get(instance)));
           maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1);
@@ -131,7 +142,7 @@ public class CardDealingAdjustmentAlgorithm {
   private void partitionDealing(Collection<String> instances,
       TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> faultZonePartitionMap,
       Map<String, String> faultZoneMap, final Map<String, List<String>> assignmentMap,
-      Map<String, Float> targetPartitionCount, final int randomSeed, int targetAdjustment) {
+      final Map<String, Float> targetPartitionCount, final int randomSeed, final int targetAdjustment) {
     PriorityQueue<String> instanceQueue =
         new PriorityQueue<>(instances.size(), new Comparator<String>() {
           @Override
@@ -139,8 +150,13 @@ public class CardDealingAdjustmentAlgorithm {
             int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0;
             int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0;
             if (node1Load == node2Load) {
-              return new Integer((node1 + randomSeed).hashCode())
-                  .compareTo((node2 + randomSeed).hashCode());
+              Float node1Target = targetPartitionCount.get(node1);
+              Float node2Target = targetPartitionCount.get(node2);
+              if (node1Target == node2Target) {
+                return new Integer((node1 + randomSeed).hashCode()).compareTo((node2 + randomSeed).hashCode());
+              } else {
+                return node2Target.compareTo(node1Target);
+              }
             } else {
               return node1Load - node2Load;
             }

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
new file mode 100644
index 0000000..b47a2ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
@@ -0,0 +1,36 @@
+package org.apache.helix.controller.rebalancer.util;
+
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.model.Partition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ResourceUsageCalculator {
+  /**
+   * A convenient tool for calculating partition capacity usage based on the assignment and resource weight provider.
+   *
+   * @param resourceAssignment
+   * @param weightProvider
+   * @return
+   */
+  public static Map<String, Integer> getResourceUsage(ResourcesStateMap resourceAssignment,
+      PartitionWeightProvider weightProvider) {
+    Map<String, Integer> newParticipantUsage = new HashMap<>();
+    for (String resource : resourceAssignment.resourceSet()) {
+      Map<Partition, Map<String, String>> stateMap =
+          resourceAssignment.getPartitionStateMap(resource).getStateMap();
+      for (Partition partition : stateMap.keySet()) {
+        for (String participant : stateMap.get(partition).keySet()) {
+          if (!newParticipantUsage.containsKey(participant)) {
+            newParticipantUsage.put(participant, 0);
+          }
+          newParticipantUsage.put(participant, newParticipantUsage.get(participant) + weightProvider
+              .getPartitionWeight(resource, partition.getPartitionName()));
+        }
+      }
+    }
+    return newParticipantUsage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3d2d57b0/helix-core/src/main/java/org/apache/helix/examples/WeightAwareRebalanceUtilExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/WeightAwareRebalanceUtilExample.java b/helix-core/src/main/java/org/apache/helix/examples/WeightAwareRebalanceUtilExample.java
new file mode 100644
index 0000000..b35aba1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/WeightAwareRebalanceUtilExample.java
@@ -0,0 +1,302 @@
+package org.apache.helix.examples;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
+import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
+import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint;
+import org.apache.helix.controller.rebalancer.constraint.TotalCapacityConstraint;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedCapacityProvider;
+import org.apache.helix.controller.rebalancer.constraint.dataprovider.ZkBasedPartitionWeightProvider;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.util.WeightAwareRebalanceUtil;
+
+import java.util.*;
+
+public class WeightAwareRebalanceUtilExample {
+  private static String ZK_ADDRESS = "localhost:2199";
+  private static String CLUSTER_NAME = "RebalanceUtilExampleCluster";
+  private static HelixAdmin admin;
+
+  final static String resourceNamePrefix = "resource";
+  final static int nParticipants = 9;
+  final static int nResources = 10;
+  final static int nPartitions = 10;
+  final static int nReplicas = 3;
+  final static int defaultCapacity = 500; // total = 500*7 = 3500
+  final static int resourceWeight = 10; // total = 10*10*3*10 = 3000
+
+  final static List<String> resourceNames = new ArrayList<>();
+  final static List<String> instanceNames = new ArrayList<>();
+  final static List<String> partitions = new ArrayList<>(nPartitions);
+  final static List<ResourceConfig> resourceConfigs = new ArrayList<>();
+
+  final static ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+  final static List<InstanceConfig> instanceConfigs = new ArrayList<>();
+
+  private static void printAssignmentInfo(ResourcesStateMap assignment) {
+    System.out.println("The result assignment is: ");
+    Map<String, Integer> instanceLoad = new HashMap<>();
+    for (String resource : assignment.getResourceStatesMap().keySet()) {
+      System.out.println(resource + ": " + assignment.getPartitionStateMap(resource).toString());
+      for (Map<String, String> stateMap : assignment.getPartitionStateMap(resource).getStateMap().values()) {
+        for (String instance : stateMap.keySet()) {
+          if (!instanceLoad.containsKey(instance)) {
+            instanceLoad.put(instance, 0);
+          }
+          instanceLoad.put(instance, instanceLoad.get(instance) + resourceWeight);
+        }
+      }
+    }
+    System.out.println("Instance load: " + instanceLoad + "\n");
+  }
+
+  private static void rebalanceUtilUsage() {
+    System.out.println(String.format("Start rebalancing using WeightAwareRebalanceUtil for %d resources.", nResources));
+
+    /**
+     * Init providers with resource weight and participant capacity.
+     *
+     * Users should have their own logic to determine this information for their applications.
+     */
+    PartitionWeightProvider weightProvider = new PartitionWeightProvider() {
+      @Override
+      public int getPartitionWeight(String resource, String partition) {
+        return resourceWeight;
+      }
+    };
+    CapacityProvider capacityProvider = new CapacityProvider() {
+      @Override
+      public int getParticipantCapacity(String participant) {
+        return defaultCapacity;
+      }
+
+      @Override
+      public int getParticipantUsage(String participant) {
+        return 0;
+      }
+    };
+
+    /**
+     * Init constraints with providers
+     * In this example, we used 2 constraints.
+     *
+     * capacityConstraint will ensure the participant capacity limitation is not exceed.
+     * evenConstraint will ensure the distribution is even through all eligible participant.
+     */
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    /**
+     * Call the util to calculate partition assignment
+     *
+     * Note that clusterConfig and instanceConfigs are predefined with minimized configuration set.
+     * Users can also read this information from cluster's ZK nodes using ConfigAccessor.
+     * @see org.apache.helix.ConfigAccessor#getInstanceConfig(String, String)
+     * @see org.apache.helix.ConfigAccessor#getClusterConfig(String)
+     */
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+
+    /**
+     * For completely new assignment, call buildIncrementalRebalanceAssignment with no existingAssignment specified.
+     *
+     * Note that user needs to build resrouceConfigs before using the tool.
+     * If the config exists in ZK, the same object can be used directly.
+     * @see org.apache.helix.ConfigAccessor#getResourceConfig(String, String)
+     */
+    ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null,
+        Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+    System.out.println(String.format("Finished rebalancing using WeightAwareRebalanceUtil for %d resources.", nResources));
+    printAssignmentInfo(assignment);
+  }
+
+  private static void rebalanceUtilUsageWithZkBasedDataProvider() {
+    System.out.println(String.format("Start rebalancing using WeightAwareRebalanceUtil and ZK based Capacity/Weight data providers for %d resources.", nResources));
+
+    // Init a zkserver & cluster nodes for this example
+    ZkServer zkServer = ExampleHelper.startZkServer(ZK_ADDRESS);
+    admin = new ZKHelixAdmin(ZK_ADDRESS);
+    admin.addCluster(CLUSTER_NAME, true);
+
+    /**
+     * In order to avoid re-construct capacity / usage information every time, user can choose to use ZK based providers.
+     * In this example, we assume the evaluating metrics are QPS and memory.
+     * In this case, 2 sets of constraints are needed.
+     *
+     * Init and persistent ZkBasedDataProvider.
+     * 1. Create ZK based providers and init with capacity / weight information.
+     * 2. Persist providers in to HelixPropertyStore.
+     * 3. Read from ZK when necessary.
+     */
+    // For QPS
+    ZkBasedPartitionWeightProvider qpsWeightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDRESS, CLUSTER_NAME, "QPS");
+    // Note that user can specify more detailed weight info for each partition.
+    qpsWeightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight);
+    ZkBasedCapacityProvider qpsCapacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDRESS, CLUSTER_NAME, "QPS");
+    qpsCapacityProvider.updateCapacity(Collections.EMPTY_MAP, Collections.EMPTY_MAP, defaultCapacity);
+    // For Memory
+    ZkBasedPartitionWeightProvider memoryWeightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDRESS, CLUSTER_NAME, "MEM");
+    // Note that user can specify more detailed capacity and usage info for each participant.
+    memoryWeightProvider.updateWeights(Collections.EMPTY_MAP, Collections.EMPTY_MAP, resourceWeight);
+    ZkBasedCapacityProvider memoryCapacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDRESS, CLUSTER_NAME, "MEM");
+    memoryCapacityProvider.updateCapacity(Collections.EMPTY_MAP, Collections.EMPTY_MAP, defaultCapacity);
+
+    // Persist providers
+    qpsCapacityProvider.persistCapacity();
+    qpsWeightProvider.persistWeights();
+    memoryCapacityProvider.persistCapacity();
+    memoryWeightProvider.persistWeights();
+
+    /**
+     * Init constraints with ZkBasedDataProvider
+     * 1. Read providers from ZK by constructing the object with same ZK address, cluster name, and dimension name
+     * 2. Specify constraints with the provider. Only use soft constraint here for simplifying.
+     */
+    qpsWeightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDRESS, CLUSTER_NAME, "QPS");
+    qpsCapacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDRESS, CLUSTER_NAME, "QPS");
+    memoryWeightProvider =
+        new ZkBasedPartitionWeightProvider(ZK_ADDRESS, CLUSTER_NAME, "MEM");
+    memoryCapacityProvider =
+        new ZkBasedCapacityProvider(ZK_ADDRESS, CLUSTER_NAME, "MEM");
+
+    // !WARNING! Don't put providers that are not providing same type of data
+    PartitionWeightAwareEvennessConstraint qpsConstraint =
+        new PartitionWeightAwareEvennessConstraint(qpsWeightProvider, qpsCapacityProvider);
+    PartitionWeightAwareEvennessConstraint memoryConstraint =
+        new PartitionWeightAwareEvennessConstraint(memoryWeightProvider, memoryCapacityProvider);
+
+    List<AbstractRebalanceSoftConstraint> softConstraints = new ArrayList<>();
+    softConstraints.add(qpsConstraint);
+    softConstraints.add(memoryConstraint);
+
+    /**
+     * Call util to calculate partition assignment.
+     * Here, use the same simple config set for example. User can always customize the configs.
+     */
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+    ResourcesStateMap assignment =
+        util.buildIncrementalRebalanceAssignment(resourceConfigs, null, Collections.EMPTY_LIST,
+            softConstraints);
+
+    ExampleHelper.stopZkServer(zkServer);
+
+    System.out.println(String.format("Finished rebalancing using WeightAwareRebalanceUtil and ZK based Capacity/Weight data providers for %d resources.", nResources));
+    printAssignmentInfo(assignment);
+  }
+
+  private static void rebalanceWithFaultZone() {
+    System.out.println(String.format("Start rebalancing using WeightAwareRebalanceUtil for %d resources in a topology aware cluster.", nResources));
+
+    /**
+     * Setup cluster config and instance configs for topology information
+     * 1. enable topology aware rebalance
+     * 2. setup topology layout and faultzone type
+     * 3. for each instance configure domain info
+     *
+     * If these information is already in ZK, user can read from ZK directly.
+     * @see org.apache.helix.ConfigAccessor#getClusterConfig(String)
+     * @see org.apache.helix.ConfigAccessor#getInstanceConfig(String, String)
+     */
+    ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setTopology("/Rack/Host");
+    clusterConfig.setFaultZoneType("Rack");
+
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    for (int i = 0; i < instanceNames.size(); i++) {
+      String instance = instanceNames.get(i);
+      InstanceConfig config = new InstanceConfig(instance);
+      String domainStr = String.format("Rack=%s,Host=%s", i % (nParticipants / 3), instance);
+      config.setDomain(domainStr);
+      instanceConfigs.add(config);
+      System.out.println(String.format("Set instance %s domain to be %s.", instance, domainStr));
+    }
+
+    /**
+     * Init constraints in the same way as the previous examples.
+     */
+    PartitionWeightProvider weightProvider = new PartitionWeightProvider() {
+      @Override
+      public int getPartitionWeight(String resource, String partition) {
+        return resourceWeight;
+      }
+    };
+    CapacityProvider capacityProvider = new CapacityProvider() {
+      @Override
+      public int getParticipantCapacity(String participant) {
+        return defaultCapacity;
+      }
+
+      @Override
+      public int getParticipantUsage(String participant) {
+        return 0;
+      }
+    };
+    TotalCapacityConstraint capacityConstraint =
+        new TotalCapacityConstraint(weightProvider, capacityProvider);
+    PartitionWeightAwareEvennessConstraint evenConstraint =
+        new PartitionWeightAwareEvennessConstraint(weightProvider, capacityProvider);
+
+    /**
+     * Call the util to calculate partition assignment
+     */
+    WeightAwareRebalanceUtil util = new WeightAwareRebalanceUtil(clusterConfig, instanceConfigs);
+
+    ResourcesStateMap assignment = util.buildIncrementalRebalanceAssignment(resourceConfigs, null,
+        Collections.<AbstractRebalanceHardConstraint>singletonList(capacityConstraint),
+        Collections.<AbstractRebalanceSoftConstraint>singletonList(evenConstraint));
+
+    System.out.println(String.format("Finished rebalancing using WeightAwareRebalanceUtil for %d resources in a topology aware cluster with %d fault zones.", nResources, nParticipants / 3));
+    printAssignmentInfo(assignment);
+  }
+
+  private static void setup() {
+    for (int i = 0; i < nParticipants; i++) {
+      instanceNames.add("node" + i);
+    }
+    for (int i = 0; i < nPartitions; i++) {
+      partitions.add(Integer.toString(i));
+    }
+
+    for (int i = 0; i < nResources; i++) {
+      String resourceName = resourceNamePrefix + i;
+      resourceNames.add(resourceName);
+      ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceName);
+      resourceBuilder.setStateModelDefRef("MasterSlave");
+      resourceBuilder.setNumReplica(nReplicas);
+      for (String partition : partitions) {
+        resourceBuilder.setPreferenceList(partition, Collections.EMPTY_LIST);
+      }
+      resourceConfigs.add(resourceBuilder.build());
+    }
+
+    for (String instance : instanceNames) {
+      InstanceConfig config = new InstanceConfig(instance);
+      instanceConfigs.add(config);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    setup();
+    rebalanceUtilUsage();
+    rebalanceUtilUsageWithZkBasedDataProvider();
+    rebalanceWithFaultZone();
+  }
+}


Mime
View raw message