helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [48/53] [abbrv] git commit: [HELIX-209] Shuffling around rebalancer code to allow for compatibility
Date Thu, 07 Nov 2013 01:19:56 GMT
[HELIX-209] Shuffling around rebalancer code to allow for compatibility


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5405df1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5405df1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5405df1e

Branch: refs/heads/master
Commit: 5405df1e1decac136d5549d047bfdd7c0412a407
Parents: 142c924
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Oct 15 16:49:46 2013 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Nov 6 13:17:38 2013 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  40 +--
 .../controller/rebalancer/AutoRebalancer.java   | 187 -----------
 .../controller/rebalancer/CustomRebalancer.java | 144 ++++-----
 .../rebalancer/FullAutoRebalancer.java          | 196 ++++++++++++
 .../controller/rebalancer/HelixRebalancer.java  |  64 ++++
 .../helix/controller/rebalancer/Rebalancer.java |   8 +-
 .../controller/rebalancer/RebalancerRef.java    |  94 ++++++
 .../rebalancer/SemiAutoRebalancer.java          |  96 +++---
 .../context/BasicRebalancerContext.java         |   1 +
 .../rebalancer/context/CustomRebalancer.java    | 123 -------
 .../context/CustomRebalancerContext.java        |   6 +-
 .../rebalancer/context/FullAutoRebalancer.java  | 194 ------------
 .../context/FullAutoRebalancerContext.java      |   2 +
 .../context/PartitionedRebalancerContext.java   |   1 +
 .../rebalancer/context/Rebalancer.java          |  38 ---
 .../rebalancer/context/RebalancerConfig.java    |   7 +-
 .../rebalancer/context/RebalancerContext.java   |   1 +
 .../rebalancer/context/RebalancerRef.java       |  94 ------
 .../rebalancer/context/SemiAutoRebalancer.java  |  81 -----
 .../context/SemiAutoRebalancerContext.java      |   8 +-
 .../util/ConstraintBasedAssignment.java         | 180 +++++++----
 .../util/NewConstraintBasedAssignment.java      | 244 --------------
 .../stages/BestPossibleStateCalcStage.java      | 143 +++++----
 .../stages/BestPossibleStateOutput.java         |  91 ++----
 .../stages/CompatibilityCheckStage.java         |  17 +-
 .../stages/CurrentStateComputationStage.java    |  91 +++---
 .../stages/ExternalViewComputeStage.java        | 154 +++++----
 .../stages/MessageGenerationPhase.java          | 215 -------------
 .../stages/MessageGenerationStage.java          | 213 +++++++++++++
 .../helix/controller/stages/MessageOutput.java  |  79 +++++
 .../stages/MessageSelectionStage.java           | 131 +++++---
 .../stages/MessageSelectionStageOutput.java     |  60 ----
 .../controller/stages/MessageThrottleStage.java |  41 ++-
 .../stages/MessageThrottleStageOutput.java      |  53 ----
 .../stages/NewBestPossibleStateCalcStage.java   | 142 ---------
 .../stages/NewBestPossibleStateOutput.java      |  49 ---
 .../stages/NewCompatibilityCheckStage.java      |  68 ----
 .../stages/NewCurrentStateComputationStage.java | 142 ---------
 .../stages/NewExternalViewComputeStage.java     | 281 ----------------
 .../stages/NewMessageGenerationStage.java       | 213 -------------
 .../controller/stages/NewMessageOutput.java     |  79 -----
 .../stages/NewMessageSelectionStage.java        | 317 -------------------
 .../stages/NewMessageThrottleStage.java         | 198 ------------
 .../stages/NewReadClusterDataStage.java         |  73 -----
 .../stages/NewResourceComputationStage.java     | 138 --------
 .../stages/NewTaskAssignmentStage.java          | 151 ---------
 .../stages/PersistAssignmentStage.java          |   2 +-
 .../controller/stages/ReadClusterDataStage.java |  50 +--
 .../controller/stages/ReadHealthDataStage.java  |   2 -
 .../stages/RebalanceIdealStateStage.java        |  84 -----
 .../stages/ResourceComputationStage.java        | 164 +++++-----
 .../controller/stages/TaskAssignmentStage.java  |  57 ++--
 .../java/org/apache/helix/model/IdealState.java |  23 +-
 .../java/org/apache/helix/model/Message.java    |   2 +-
 .../apache/helix/model/ResourceAssignment.java  |  34 +-
 .../helix/model/StateModelDefinition.java       |  28 +-
 .../helix/model/builder/AutoModeISBuilder.java  |  62 +++-
 .../builder/AutoRebalanceModeISBuilder.java     |  32 +-
 .../builder/ClusterConstraintsBuilder.java      |   8 +-
 .../model/builder/CurrentStateBuilder.java      |  11 +-
 .../model/builder/CustomModeISBuilder.java      |  61 +++-
 .../model/builder/ExternalViewBuilder.java      |  24 --
 .../helix/tools/ClusterStateVerifier.java       |  20 +-
 .../org/apache/helix/api/TestNewStages.java     |   8 +-
 .../TestBestPossibleCalcStageCompatibility.java |  12 +-
 .../stages/TestBestPossibleStateCalcStage.java  |   6 +-
 .../stages/TestCompatibilityCheckStage.java     |  10 +-
 .../TestCurrentStateComputationStage.java       |  12 +-
 .../stages/TestMessageThrottleStage.java        |  20 +-
 .../stages/TestMsgSelectionStage.java           |   6 +-
 .../stages/TestRebalancePipeline.java           |  54 ++--
 .../stages/TestResourceComputationStage.java    |  14 +-
 .../strategy/TestAutoRebalanceStrategy.java     |   6 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |   6 +-
 .../TestCustomizedIdealStateRebalancer.java     |   4 +-
 75 files changed, 1698 insertions(+), 4072 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index b27398d..eec745e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -45,16 +45,16 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
-import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
-import org.apache.helix.controller.stages.NewExternalViewComputeStage;
-import org.apache.helix.controller.stages.NewMessageGenerationStage;
-import org.apache.helix.controller.stages.NewMessageSelectionStage;
-import org.apache.helix.controller.stages.NewMessageThrottleStage;
-import org.apache.helix.controller.stages.NewReadClusterDataStage;
-import org.apache.helix.controller.stages.NewResourceComputationStage;
-import org.apache.helix.controller.stages.NewTaskAssignmentStage;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.CompatibilityCheckStage;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ExternalViewComputeStage;
+import org.apache.helix.controller.stages.MessageGenerationStage;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
@@ -175,23 +175,23 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
       // cluster data cache refresh
       Pipeline dataRefresh = new Pipeline();
-      dataRefresh.addStage(new NewReadClusterDataStage());
+      dataRefresh.addStage(new ReadClusterDataStage());
 
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline();
-      rebalancePipeline.addStage(new NewCompatibilityCheckStage());
-      rebalancePipeline.addStage(new NewResourceComputationStage());
-      rebalancePipeline.addStage(new NewCurrentStateComputationStage());
-      rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new CompatibilityCheckStage());
+      rebalancePipeline.addStage(new ResourceComputationStage());
+      rebalancePipeline.addStage(new CurrentStateComputationStage());
+      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
       rebalancePipeline.addStage(new PersistAssignmentStage());
-      rebalancePipeline.addStage(new NewMessageGenerationStage());
-      rebalancePipeline.addStage(new NewMessageSelectionStage());
-      rebalancePipeline.addStage(new NewMessageThrottleStage());
-      rebalancePipeline.addStage(new NewTaskAssignmentStage());
+      rebalancePipeline.addStage(new MessageGenerationStage());
+      rebalancePipeline.addStage(new MessageSelectionStage());
+      rebalancePipeline.addStage(new MessageThrottleStage());
+      rebalancePipeline.addStage(new TaskAssignmentStage());
 
       // external view generation
       Pipeline externalViewPipeline = new Pipeline();
-      externalViewPipeline.addStage(new NewExternalViewComputeStage());
+      externalViewPipeline.addStage(new ExternalViewComputeStage());
 
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
       registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
deleted file mode 100644
index 880f2c0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
- * state of a resource, fully adapting to the addition or removal of instances. This includes
- * computation of a new preference list and a partition to instance and state mapping based on the
- * computed instance preferences.
- * The input is the current assignment of partitions to instances, as well as existing instance
- * preferences, if any.
- * The output is a preference list and a mapping based on that preference list, i.e. partition p
- * has a replica on node k with state s.
- */
-@Deprecated
-public class AutoRebalancer implements Rebalancer {
-  // These should be final, but are initialized in init rather than a constructor
-  private HelixManager _manager;
-  private AutoRebalanceStrategy _algorithm;
-
-  private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager manager) {
-    this._manager = manager;
-    this._algorithm = null;
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    // Compute a preference list based on the current ideal state
-    List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
-    String stateModelName = currentIdealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
-    Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
-    String replicas = currentIdealState.getReplicas();
-
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    stateCountMap =
-        ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
-            Integer.parseInt(replicas));
-    List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
-    Map<String, Map<String, String>> currentMapping =
-        currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
-
-    // If there are nodes tagged with resource name, use only those nodes
-    Set<String> taggedNodes = new HashSet<String>();
-    if (currentIdealState.getInstanceGroupTag() != null) {
-      for (String instanceName : liveNodes) {
-        if (clusterData.getInstanceConfigMap().get(instanceName)
-            .containsTag(currentIdealState.getInstanceGroupTag())) {
-          taggedNodes.add(instanceName);
-        }
-      }
-    }
-    if (taggedNodes.size() > 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
-            + " " + taggedNodes);
-      }
-      liveNodes = new ArrayList<String>(taggedNodes);
-    }
-
-    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
-    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveNodes);
-      LOG.info("allNodes: " + allNodes);
-      LOG.info("maxPartition: " + maxPartition);
-    }
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    placementScheme.init(_manager);
-    _algorithm =
-        new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap,
-            maxPartition, placementScheme);
-    ZNRecord newMapping =
-        _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("newMapping: " + newMapping);
-    }
-
-    IdealState newIdealState = new IdealState(resource.getResourceName());
-    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
-    newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
-    newIdealState.getRecord().setListFields(newMapping.getListFields());
-
-    // compute a full partition mapping for the resource
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resource.getResourceName());
-    }
-    ResourceAssignment partitionMapping =
-        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
-    for (String partitionName : partitions) {
-      Partition partition = new Partition(partitionName);
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-      Set<String> disabledInstancesForPartition =
-          clusterData.getDisabledInstancesForPartition(partition.toString());
-      List<String> preferenceList =
-          ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState,
-              stateModelDef);
-      Map<String, String> bestStateForPartition =
-          ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(PartitionId.from(partitionName),
-          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
-    }
-    return partitionMapping;
-  }
-
-  private Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
-      String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
-
-    Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
-
-    for (String partition : partitions) {
-      Map<String, String> curStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
-      map.put(partition, new HashMap<String, String>());
-      for (String node : curStateMap.keySet()) {
-        String state = curStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-
-      Map<String, String> pendingStateMap =
-          currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
-      for (String node : pendingStateMap.keySet()) {
-        String state = pendingStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-    }
-    return map;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index f6ea60f..69c379a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -1,5 +1,23 @@
 package org.apache.helix.controller.rebalancer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,117 +37,89 @@ package org.apache.helix.controller.rebalancer;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
- * partitions against the set of live instances to mark assignment states as dropped or erroneous
- * as necessary.
- * The input is the required current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a verified mapping based on that preference list, i.e. partition p has a replica
- * on node k with state s, where s may be a dropped or error state if necessary.
- */
-@Deprecated
-public class CustomRebalancer implements Rebalancer {
+public class CustomRebalancer implements HelixRebalancer {
 
   private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
 
   @Override
-  public void init(HelixManager manager) {
+  public void init(HelixManager helixManager) {
+    // do nothing
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    String stateModelDefName = currentIdealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    CustomRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resource.getResourceName());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping =
-        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-      Set<String> disabledInstancesForPartition =
-          clusterData.getDisabledInstancesForPartition(partition.toString());
-      Map<String, String> idealStateMap =
-          IdealState.stringMapFromParticipantStateMap(currentIdealState
-              .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
-      Map<String, String> bestStateForPartition =
-          computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
-              currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
-          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
+      Set<ParticipantId> disabledInstancesForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition);
+      Map<ParticipantId, State> bestStateForPartition =
+          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
+              stateModelDef, config.getPreferenceMap(partition), currentStateMap,
+              disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
 
   /**
-   * compute best state for resource in CUSTOMIZED ideal state mode
-   * @param cache
+   * compute best state for resource in CUSTOMIZED rebalancer mode
+   * @param liveParticipantMap
    * @param stateModelDef
-   * @param idealStateMap
+   * @param preferenceMap
    * @param currentStateMap
-   * @param disabledInstancesForPartition
+   * @param disabledParticipantsForPartition
    * @return
    */
-  private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
-      StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
-    Map<String, String> instanceStateMap = new HashMap<String, String>();
+  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
-    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // if the resource is deleted, idealStateMap will be null/empty and
     // we should drop all resources.
     if (currentStateMap != null) {
-      for (String instance : currentStateMap.keySet()) {
-        if ((idealStateMap == null || !idealStateMap.containsKey(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
+            && !disabledParticipantsForPartition.contains(participantId)) {
           // if dropped and not disabled, transit to DROPPED
-          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
-            HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance)) {
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
+          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
         }
       }
     }
 
     // ideal state is deleted
-    if (idealStateMap == null) {
-      return instanceStateMap;
+    if (preferenceMap == null) {
+      return participantStateMap;
     }
 
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
-    for (String instance : idealStateMap.keySet()) {
+    for (ParticipantId participantId : preferenceMap.keySet()) {
       boolean notInErrorState =
-          currentStateMap == null || currentStateMap.get(instance) == null
-              || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+          currentStateMap == null || currentStateMap.get(participantId) == null
+              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
 
-      if (liveInstancesMap.containsKey(instance) && notInErrorState
-          && !disabledInstancesForPartition.contains(instance)) {
-        instanceStateMap.put(instance, idealStateMap.get(instance));
+      if (liveParticipantSet.contains(participantId) && notInErrorState
+          && !disabledParticipantsForPartition.contains(participantId)) {
+        participantStateMap.put(participantId, preferenceMap.get(participantId));
       }
     }
 
-    return instanceStateMap;
+    return participantStateMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
new file mode 100644
index 0000000..d0a96cc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -0,0 +1,196 @@
+package org.apache.helix.controller.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class FullAutoRebalancer implements HelixRebalancer {
+  // These should be final, but are initialized in init rather than a constructor
+  private AutoRebalanceStrategy _algorithm;
+
+  private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager helixManager) {
+    // do nothing
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    FullAutoRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
+    // Compute a preference list based on the current ideal state
+    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+    int replicas = -1;
+    if (config.anyLiveParticipant()) {
+      replicas = liveParticipants.size();
+    } else {
+      replicas = config.getReplicaCount();
+    }
+
+    // count how many replicas should be in each state
+    Map<State, String> upperBounds =
+        ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+            cluster.getConfig());
+    LinkedHashMap<State, Integer> stateCountMap =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
+            liveParticipants.size(), replicas);
+
+    // get the participant lists
+    List<ParticipantId> liveParticipantList =
+        new ArrayList<ParticipantId>(liveParticipants.keySet());
+    List<ParticipantId> allParticipantList =
+        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+
+    // compute the current mapping from the current state
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+        currentMapping(config, currentState, stateCountMap);
+
+    // If there are nodes tagged with resource, use only those nodes
+    Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
+    if (config.getParticipantGroupTag() != null) {
+      for (ParticipantId participantId : liveParticipantList) {
+        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
+          taggedNodes.add(participantId);
+        }
+      }
+    }
+    if (taggedNodes.size() > 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("found the following instances with tag " + config.getResourceId() + " "
+            + taggedNodes);
+      }
+      liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
+    }
+
+    // determine which nodes the replicas should live on
+    int maxPartition = config.getMaxPartitionsPerParticipant();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveParticipantList);
+      LOG.info("allNodes: " + allParticipantList);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    _algorithm =
+        new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+            placementScheme);
+    ZNRecord newMapping =
+        _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+            allParticipantList);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    // compute a full partition mapping for the resource
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + config.getResourceId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : partitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+      if (rawPreferenceList == null) {
+        rawPreferenceList = Collections.emptyList();
+      }
+      List<ParticipantId> preferenceList =
+          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+            @Override
+            public ParticipantId apply(String participantName) {
+              return ParticipantId.from(participantName);
+            }
+          });
+      preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
+      Map<ParticipantId, State> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+              liveParticipants.keySet(), stateModelDef, preferenceList,
+              currentState.getCurrentStateMap(config.getResourceId(), partition),
+              disabledParticipantsForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+      Map<State, Integer> stateCountMap) {
+    Map<PartitionId, Map<ParticipantId, State>> map =
+        new HashMap<PartitionId, Map<ParticipantId, State>>();
+
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> curStateMap =
+          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+      map.put(partition, new HashMap<ParticipantId, State>());
+      for (ParticipantId node : curStateMap.keySet()) {
+        State state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+
+      Map<ParticipantId, State> pendingStateMap =
+          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+      for (ParticipantId node : pendingStateMap.keySet()) {
+        State state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+    }
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
new file mode 100644
index 0000000..7fcbba5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
@@ -0,0 +1,64 @@
+package org.apache.helix.controller.rebalancer;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Allows one to come up with custom implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the resource assignment for a resource in this method.<br/>
+ */
+public interface HelixRebalancer {
+  /**
+   * Initialize the rebalancer with a HelixManager if necessary
+   * @param manager
+   */
+  public void init(HelixManager helixManager);
+
+  /**
+   * Given an ideal state for a resource and liveness of participants, compute a assignment of
+   * instances and states to each partition of a resource. This method provides all the relevant
+   * information needed to rebalance a resource. If you need additional information use
+   * manager.getAccessor to read and write the cluster data. This allows one to compute the
+   * ResourceAssignment according to app-specific requirements.<br/>
+   * <br/>
+   * Say that you have:<br/>
+   * 
+   * <pre>
+   * class MyRebalancerContext implements RebalancerContext
+   * </pre>
+   * 
+   * as your rebalancer context. To extract it from a RebalancerConfig, do the following:<br/>
+   * 
+   * <pre>
+   * MyRebalancerContext context = rebalancerConfig.getRebalancerContext(MyRebalancerContext.class);
+   * </pre>
+   * @param rebalancerConfig the properties of the resource for which a mapping will be computed
+   * @param cluster complete snapshot of the cluster
+   * @param currentState the current states of all partitions
+   */
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index 5dd399f..0e6cb10 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -23,13 +23,13 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
 
 /**
  * Allows one to come up with custom implementation of a rebalancer.<br/>
  * This will be invoked on all changes that happen in the cluster.<br/>
  * Simply return the newIdealState for a resource in this method.<br/>
+ * <br/>
+ * Deprecated. Use {@link HelixRebalancer} instead.
  */
 @Deprecated
 public interface Rebalancer {
@@ -37,7 +37,7 @@ public interface Rebalancer {
    * Initialize the rebalancer with a HelixManager if necessary
    * @param manager
    */
-  void init(HelixManager manager);
+  public void init(HelixManager manager);
 
   /**
    * Given an ideal state for a resource and liveness of instances, compute a assignment of
@@ -50,7 +50,7 @@ public interface Rebalancer {
    * @param currentStateOutput the current states of all partitions
    * @param clusterData cache of the cluster state
    */
-  ResourceAssignment computeResourceMapping(final Resource resource,
+  public IdealState computeResourceMapping(final String resourceName,
       final IdealState currentIdealState, final CurrentStateOutput currentStateOutput,
       final ClusterDataCache clusterData);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
new file mode 100644
index 0000000..974222d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Reference to a class that extends {@link HelixRebalancer}. It loads the class automatically.
+ */
+public class RebalancerRef {
+  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
+  @JsonProperty("rebalancerClassName")
+  private final String _rebalancerClassName;
+
+  @JsonCreator
+  private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
+    _rebalancerClassName = rebalancerClassName;
+  }
+
+  /**
+   * Get an instantiated Rebalancer
+   * @return Rebalancer or null if instantiation failed
+   */
+  @JsonIgnore
+  public HelixRebalancer getRebalancer() {
+    try {
+      return (HelixRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+    } catch (Exception e) {
+      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return _rebalancerClassName;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof RebalancerRef) {
+      return this.toString().equals(((RebalancerRef) that).toString());
+    } else if (that instanceof String) {
+      return this.toString().equals(that);
+    }
+    return false;
+  }
+
+  /**
+   * Get a rebalancer class reference
+   * @param rebalancerClassName name of the class
+   * @return RebalancerRef or null if name is null
+   */
+  public static RebalancerRef from(String rebalancerClassName) {
+    if (rebalancerClassName == null) {
+      return null;
+    }
+    return new RebalancerRef(rebalancerClassName);
+  }
+
+  /**
+   * Get a RebalancerRef from a class object
+   * @param rebalancerClass class that implements Rebalancer
+   * @return RebalancerRef
+   */
+  public static RebalancerRef from(Class<? extends HelixRebalancer> rebalancerClass) {
+    if (rebalancerClass == null) {
+      return null;
+    }
+    return RebalancerRef.from(rebalancerClass.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index dd9fcf1..c5a7f22 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -1,5 +1,22 @@
 package org.apache.helix.controller.rebalancer;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,65 +36,48 @@ package org.apache.helix.controller.rebalancer;
  * under the License.
  */
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
 /**
- * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
- * state of a resource based on a predefined preference list of instances willing to accept
- * replicas.
- * The input is the optional current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
- * with state s.
+ * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
+ * locations of each partition replica
  */
-@Deprecated
-public class SemiAutoRebalancer implements Rebalancer {
-
+public class SemiAutoRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
 
   @Override
-  public void init(HelixManager manager) {
+  public void init(HelixManager helixManager) {
+    // do nothing
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    String stateModelDefName = currentIdealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    SemiAutoRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resource.getResourceName());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping =
-        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-      Set<String> disabledInstancesForPartition =
-          clusterData.getDisabledInstancesForPartition(partition.toString());
-      List<String> preferenceList =
-          ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState,
-              stateModelDef);
-      Map<String, String> bestStateForPartition =
-          ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
-          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
+      Set<ParticipantId> disabledInstancesForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition);
+      List<ParticipantId> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(cluster, partition,
+              config.getPreferenceList(partition));
+      Map<State, String> upperBounds =
+          ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+              cluster.getConfig());
+      Map<ParticipantId, State> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+              .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
+              disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
index bb2ab17..ec765d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
@@ -7,6 +7,7 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.codehaus.jackson.annotate.JsonIgnore;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
deleted file mode 100644
index 00219af..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class CustomRebalancer implements Rebalancer {
-
-  private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    CustomRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      Map<ParticipantId, State> bestStateForPartition =
-          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
-              stateModelDef, config.getPreferenceMap(partition), currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-  /**
-   * compute best state for resource in CUSTOMIZED rebalancer mode
-   * @param liveParticipantMap
-   * @param stateModelDef
-   * @param preferenceMap
-   * @param currentStateMap
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
-      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
-      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, idealStateMap will be null/empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
-            && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
-        }
-      }
-    }
-
-    // ideal state is deleted
-    if (preferenceMap == null) {
-      return participantStateMap;
-    }
-
-    for (ParticipantId participantId : preferenceMap.keySet()) {
-      boolean notInErrorState =
-          currentStateMap == null || currentStateMap.get(participantId) == null
-              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
-
-      if (liveParticipantSet.contains(participantId) && notInErrorState
-          && !disabledParticipantsForPartition.contains(participantId)) {
-        participantStateMap.put(participantId, preferenceMap.get(participantId));
-      }
-    }
-
-    return participantStateMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
index 6e1485b..1fc1cda 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -10,7 +10,9 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -102,7 +104,7 @@ public class CustomRebalancerContext extends PartitionedRebalancerContext {
 
     // determine the preference maps
     LinkedHashMap<State, Integer> stateCounts =
-        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
             getReplicaCount());
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
deleted file mode 100644
index 521af5c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class FullAutoRebalancer implements Rebalancer {
-  // These should be final, but are initialized in init rather than a constructor
-  private AutoRebalanceStrategy _algorithm;
-
-  private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    FullAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    // Compute a preference list based on the current ideal state
-    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
-    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
-    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
-    int replicas = -1;
-    if (config.anyLiveParticipant()) {
-      replicas = liveParticipants.size();
-    } else {
-      replicas = config.getReplicaCount();
-    }
-
-    // count how many replicas should be in each state
-    Map<State, String> upperBounds =
-        NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
-            cluster.getConfig());
-    LinkedHashMap<State, Integer> stateCountMap =
-        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
-            liveParticipants.size(), replicas);
-
-    // get the participant lists
-    List<ParticipantId> liveParticipantList =
-        new ArrayList<ParticipantId>(liveParticipants.keySet());
-    List<ParticipantId> allParticipantList =
-        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
-
-    // compute the current mapping from the current state
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
-        currentMapping(config, currentState, stateCountMap);
-
-    // If there are nodes tagged with resource, use only those nodes
-    Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
-    if (config.getParticipantGroupTag() != null) {
-      for (ParticipantId participantId : liveParticipantList) {
-        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
-          taggedNodes.add(participantId);
-        }
-      }
-    }
-    if (taggedNodes.size() > 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + config.getResourceId() + " "
-            + taggedNodes);
-      }
-      liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
-    }
-
-    // determine which nodes the replicas should live on
-    int maxPartition = config.getMaxPartitionsPerParticipant();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveParticipantList);
-      LOG.info("allNodes: " + allParticipantList);
-      LOG.info("maxPartition: " + maxPartition);
-    }
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    _algorithm =
-        new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
-            placementScheme);
-    ZNRecord newMapping =
-        _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
-            allParticipantList);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("newMapping: " + newMapping);
-    }
-
-    // compute a full partition mapping for the resource
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : partitions) {
-      Set<ParticipantId> disabledParticipantsForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
-      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
-      if (rawPreferenceList == null) {
-        rawPreferenceList = Collections.emptyList();
-      }
-      List<ParticipantId> preferenceList =
-          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
-            @Override
-            public ParticipantId apply(String participantName) {
-              return ParticipantId.from(participantName);
-            }
-          });
-      preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
-      Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
-              liveParticipants.keySet(), stateModelDef, preferenceList,
-              currentState.getCurrentStateMap(config.getResourceId(), partition),
-              disabledParticipantsForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
-      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
-      Map<State, Integer> stateCountMap) {
-    Map<PartitionId, Map<ParticipantId, State>> map =
-        new HashMap<PartitionId, Map<ParticipantId, State>>();
-
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
-      map.put(partition, new HashMap<ParticipantId, State>());
-      for (ParticipantId node : curStateMap.keySet()) {
-        State state = curStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-
-      Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
-      for (ParticipantId node : pendingStateMap.keySet()) {
-        State state = pendingStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-    }
-    return map;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
index 11a1b47..2db9ac6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
@@ -1,6 +1,8 @@
 package org.apache.helix.controller.rebalancer.context;
 
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.model.IdealState.RebalanceMode;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
index 768e40c..9d24e68 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -10,6 +10,7 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.StateModelDefinition;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java
deleted file mode 100644
index 6e55e63..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Allows one to come up with custom implementation of a rebalancer.<br/>
- * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the resource assignment for a resource in this method.<br/>
- */
-public interface Rebalancer {
-
-  public void init(HelixManager helixManager);
-
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
-      ResourceCurrentState currentState);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index 924b8a1..64f70c3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -2,6 +2,7 @@ package org.apache.helix.controller.rebalancer.context;
 
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
@@ -38,7 +39,7 @@ public final class RebalancerConfig {
 
   private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
   private ContextSerializer _serializer;
-  private Rebalancer _rebalancer;
+  private HelixRebalancer _rebalancer;
   private final RebalancerContext _context;
   private final NamespacedConfig _config;
 
@@ -116,7 +117,7 @@ public final class RebalancerConfig {
    * Get a rebalancer class instance
    * @return Rebalancer
    */
-  public Rebalancer getRebalancer() {
+  public HelixRebalancer getRebalancer() {
     // cache the rebalancer to avoid loading and instantiating it excessively
     if (_rebalancer == null) {
       if (_context == null || _context.getRebalancerRef() == null) {
@@ -136,7 +137,7 @@ public final class RebalancerConfig {
     try {
       return contextClass.cast(_context);
     } catch (ClassCastException e) {
-      LOG.info(contextClass + " is incompatible with context class: " + _context.getClass());
+      LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
index ea35525..981891b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -8,6 +8,7 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
deleted file mode 100644
index a90b77a..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
- */
-public class RebalancerRef {
-  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
-
-  @JsonProperty("rebalancerClassName")
-  private final String _rebalancerClassName;
-
-  @JsonCreator
-  private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
-    _rebalancerClassName = rebalancerClassName;
-  }
-
-  /**
-   * Get an instantiated Rebalancer
-   * @return Rebalancer or null if instantiation failed
-   */
-  @JsonIgnore
-  public Rebalancer getRebalancer() {
-    try {
-      return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
-    } catch (Exception e) {
-      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
-    }
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return _rebalancerClassName;
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    if (that instanceof RebalancerRef) {
-      return this.toString().equals(((RebalancerRef) that).toString());
-    } else if (that instanceof String) {
-      return this.toString().equals(that);
-    }
-    return false;
-  }
-
-  /**
-   * Get a rebalancer class reference
-   * @param rebalancerClassName name of the class
-   * @return RebalancerRef or null if name is null
-   */
-  public static RebalancerRef from(String rebalancerClassName) {
-    if (rebalancerClassName == null) {
-      return null;
-    }
-    return new RebalancerRef(rebalancerClassName);
-  }
-
-  /**
-   * Get a RebalancerRef from a class object
-   * @param rebalancerClass class that implements Rebalancer
-   * @return RebalancerRef
-   */
-  public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
-    if (rebalancerClass == null) {
-      return null;
-    }
-    return RebalancerRef.from(rebalancerClass.getName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
deleted file mode 100644
index 3f0dd13..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
- * locations of each partition replica
- */
-public class SemiAutoRebalancer implements Rebalancer {
-  private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    SemiAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
-              config.getPreferenceList(partition));
-      Map<State, String> upperBounds =
-          NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
-              cluster.getConfig());
-      Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
-              .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
index 72b3bc7..f574a62 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -11,7 +11,9 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -108,7 +110,7 @@ public final class SemiAutoRebalancerContext extends PartitionedRebalancerContex
         Set<ParticipantId> disabledParticipants = Collections.emptySet();
         Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
         Map<ParticipantId, State> initialMap =
-            NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
                 participantSet, stateModelDef, preferenceList, emptyCurrentState,
                 disabledParticipants);
         currentMapping.put(partitionId, initialMap);
@@ -117,7 +119,7 @@ public final class SemiAutoRebalancerContext extends PartitionedRebalancerContex
 
     // determine the preference
     LinkedHashMap<State, Integer> stateCounts =
-        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
             getReplicaCount());
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);


Mime
View raw message