helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [47/53] [abbrv] [HELIX-209] Shuffling around rebalancer code to allow for compatibility
Date Thu, 07 Nov 2013 01:19:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 323be34..a0ee1c1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -30,87 +30,136 @@ import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDefinedState;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.
+ * Collection of functions that will compute the best possible state based on the participants and
+ * the rebalancer configuration of a resource.
  */
-@Deprecated
 public class ConstraintBasedAssignment {
   private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
 
-  public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
-      IdealState idealState, StateModelDefinition stateModelDef) {
-    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+  /**
+   * Get a set of disabled participants for a partition
+   * @param participantMap map of all participants
+   * @param partitionId the partition to check
+   * @return a set of all participants that are disabled for the partition
+   */
+  public static Set<ParticipantId> getDisabledParticipants(
+      final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+    Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
+    Set<ParticipantId> disabledParticipantsForPartition =
+        Sets.filter(participantSet, new Predicate<ParticipantId>() {
+          @Override
+          public boolean apply(ParticipantId participantId) {
+            Participant participant = participantMap.get(participantId);
+            return !participant.isEnabled()
+                || participant.getDisabledPartitionIds().contains(partitionId);
+          }
+        });
+    return disabledParticipantsForPartition;
+  }
 
-    if (listField != null && listField.size() == 1
-        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
-      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-      List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+  /**
+   * Get an ordered list of participants that can serve a partition
+   * @param cluster cluster snapshot
+   * @param partitionId the partition to look up
+   * @param config rebalancing constraints
+   * @return list with most preferred participants first
+   */
+  public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
+      List<ParticipantId> prefList) {
+    if (prefList != null && prefList.size() == 1
+        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
+      prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
       Collections.sort(prefList);
-      return prefList;
-    } else {
-      return listField;
     }
+    return prefList;
   }
 
   /**
-   * compute best state for resource in AUTO ideal state mode
-   * @param cache
+   * Get a map of state to upper bound constraint given a cluster
+   * @param stateModelDef the state model definition to check
+   * @param resourceId the resource that is constraint
+   * @param cluster the cluster the resource belongs to
+   * @return map of state to upper bound
+   */
+  public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
+      ResourceId resourceId, ClusterConfig cluster) {
+    Map<State, String> stateMap = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      String num =
+          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+              stateModelDef.getStateModelDefId(), state);
+      stateMap.put(state, num);
+    }
+    return stateMap;
+  }
+
+  /**
+   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
+   * @param upperBounds map of state to upper bound
+   * @param liveParticipantSet set of live participant ids
    * @param stateModelDef
-   * @param instancePreferenceList
+   * @param participantPreferenceList
    * @param currentStateMap
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
+   *          : participant->state for each partition
+   * @param disabledParticipantsForPartition
    * @return
    */
-  public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
-      StateModelDefinition stateModelDef, List<String> instancePreferenceList,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
-    Map<String, String> instanceStateMap = new HashMap<String, String>();
+  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
+      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
-    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // if the resource is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
     if (currentStateMap != null) {
-      for (String instance : currentStateMap.keySet()) {
-        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((participantPreferenceList == null || !participantPreferenceList
+            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
           // if dropped and not disabled, transit to DROPPED
-          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
-            HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance)) {
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
+          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
         }
       }
     }
 
-    // ideal state is deleted
-    if (instancePreferenceList == null) {
-      return instanceStateMap;
+    // resource is deleted
+    if (participantPreferenceList == null) {
+      return participantStateMap;
     }
 
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-    boolean assigned[] = new boolean[instancePreferenceList.size()];
-
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+    boolean assigned[] = new boolean[participantPreferenceList.size()];
 
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
+    for (State state : statesPriorityList) {
+      String num = upperBounds.get(state);
       int stateCount = -1;
       if ("N".equals(num)) {
-        Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
-        liveAndEnabled.removeAll(disabledInstancesForPartition);
+        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
+        liveAndEnabled.removeAll(disabledParticipantsForPartition);
         stateCount = liveAndEnabled.size();
       } else if ("R".equals(num)) {
-        stateCount = instancePreferenceList.size();
+        stateCount = participantPreferenceList.size();
       } else {
         try {
           stateCount = Integer.parseInt(num);
@@ -120,16 +169,18 @@ public class ConstraintBasedAssignment {
       }
       if (stateCount > -1) {
         int count = 0;
-        for (int i = 0; i < instancePreferenceList.size(); i++) {
-          String instanceName = instancePreferenceList.get(i);
+        for (int i = 0; i < participantPreferenceList.size(); i++) {
+          ParticipantId participantId = participantPreferenceList.get(i);
 
           boolean notInErrorState =
-              currentStateMap == null || currentStateMap.get(instanceName) == null
-                  || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
-
-          if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
-              && !disabledInstancesForPartition.contains(instanceName)) {
-            instanceStateMap.put(instanceName, state);
+              currentStateMap == null
+                  || currentStateMap.get(participantId) == null
+                  || !currentStateMap.get(participantId)
+                      .equals(State.from(HelixDefinedState.ERROR));
+
+          if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
+              && !disabledParticipantsForPartition.contains(participantId)) {
+            participantStateMap.put(participantId, state);
             count = count + 1;
             assigned[i] = true;
             if (count == stateCount) {
@@ -139,24 +190,25 @@ public class ConstraintBasedAssignment {
         }
       }
     }
-    return instanceStateMap;
+    return participantStateMap;
   }
 
   /**
    * Get the number of replicas that should be in each state for a partition
+   * @param upperBounds map of state to upper bound
    * @param stateModelDef StateModelDefinition object
    * @param liveNodesNb number of live nodes
    * @param total number of replicas
    * @return state count map: state->count
    */
-  public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
-      int liveNodesNb, int totalReplicas) {
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+  public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
+      StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
+    LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
+    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
 
     int replicas = totalReplicas;
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
+    for (State state : statesPriorityList) {
+      String num = upperBounds.get(state);
       if ("N".equals(num)) {
         stateCountMap.put(state, liveNodesNb);
       } else if ("R".equals(num)) {
@@ -179,8 +231,8 @@ public class ConstraintBasedAssignment {
     }
 
     // get state count for R
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
+    for (State state : statesPriorityList) {
+      String num = upperBounds.get(state);
       if ("R".equals(num)) {
         stateCountMap.put(state, replicas);
         // should have at most one state using R

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
deleted file mode 100644
index f703073..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.apache.helix.controller.rebalancer.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Collection of functions that will compute the best possible state based on the participants and
- * the rebalancer configuration of a resource.
- */
-public class NewConstraintBasedAssignment {
-  private static Logger logger = Logger.getLogger(NewConstraintBasedAssignment.class);
-
-  /**
-   * Get a set of disabled participants for a partition
-   * @param participantMap map of all participants
-   * @param partitionId the partition to check
-   * @return a set of all participants that are disabled for the partition
-   */
-  public static Set<ParticipantId> getDisabledParticipants(
-      final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
-    Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
-    Set<ParticipantId> disabledParticipantsForPartition =
-        Sets.filter(participantSet, new Predicate<ParticipantId>() {
-          @Override
-          public boolean apply(ParticipantId participantId) {
-            Participant participant = participantMap.get(participantId);
-            return !participant.isEnabled()
-                || participant.getDisabledPartitionIds().contains(partitionId);
-          }
-        });
-    return disabledParticipantsForPartition;
-  }
-
-  /**
-   * Get an ordered list of participants that can serve a partition
-   * @param cluster cluster snapshot
-   * @param partitionId the partition to look up
-   * @param config rebalancing constraints
-   * @return list with most preferred participants first
-   */
-  public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
-      List<ParticipantId> prefList) {
-    if (prefList != null && prefList.size() == 1
-        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
-      prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      Collections.sort(prefList);
-    }
-    return prefList;
-  }
-
-  /**
-   * Get a map of state to upper bound constraint given a cluster
-   * @param stateModelDef the state model definition to check
-   * @param resourceId the resource that is constraint
-   * @param cluster the cluster the resource belongs to
-   * @return map of state to upper bound
-   */
-  public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
-      ResourceId resourceId, ClusterConfig cluster) {
-    Map<State, String> stateMap = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      String num =
-          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
-              stateModelDef.getStateModelDefId(), state);
-      stateMap.put(state, num);
-    }
-    return stateMap;
-  }
-
-  /**
-   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
-   * @param upperBounds map of state to upper bound
-   * @param liveParticipantSet set of live participant ids
-   * @param stateModelDef
-   * @param participantPreferenceList
-   * @param currentStateMap
-   *          : participant->state for each partition
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
-      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
-      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
-      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, instancePreferenceList will be empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((participantPreferenceList == null || !participantPreferenceList
-            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
-        }
-      }
-    }
-
-    // resource is deleted
-    if (participantPreferenceList == null) {
-      return participantStateMap;
-    }
-
-    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-    boolean assigned[] = new boolean[participantPreferenceList.size()];
-
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      int stateCount = -1;
-      if ("N".equals(num)) {
-        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
-        liveAndEnabled.removeAll(disabledParticipantsForPartition);
-        stateCount = liveAndEnabled.size();
-      } else if ("R".equals(num)) {
-        stateCount = participantPreferenceList.size();
-      } else {
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          logger.error("Invalid count for state:" + state + " ,count=" + num);
-        }
-      }
-      if (stateCount > -1) {
-        int count = 0;
-        for (int i = 0; i < participantPreferenceList.size(); i++) {
-          ParticipantId participantId = participantPreferenceList.get(i);
-
-          boolean notInErrorState =
-              currentStateMap == null
-                  || currentStateMap.get(participantId) == null
-                  || !currentStateMap.get(participantId)
-                      .equals(State.from(HelixDefinedState.ERROR));
-
-          if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
-              && !disabledParticipantsForPartition.contains(participantId)) {
-            participantStateMap.put(participantId, state);
-            count = count + 1;
-            assigned[i] = true;
-            if (count == stateCount) {
-              break;
-            }
-          }
-        }
-      }
-    }
-    return participantStateMap;
-  }
-
-  /**
-   * Get the number of replicas that should be in each state for a partition
-   * @param upperBounds map of state to upper bound
-   * @param stateModelDef StateModelDefinition object
-   * @param liveNodesNb number of live nodes
-   * @param total number of replicas
-   * @return state count map: state->count
-   */
-  public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
-      StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
-    LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
-    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-
-    int replicas = totalReplicas;
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      if ("N".equals(num)) {
-        stateCountMap.put(state, liveNodesNb);
-      } else if ("R".equals(num)) {
-        // wait until we get the counts for all other states
-        continue;
-      } else {
-        int stateCount = -1;
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          // LOG.error("Invalid count for state: " + state + ", count: " + num +
-          // ", use -1 instead");
-        }
-
-        if (stateCount > 0) {
-          stateCountMap.put(state, stateCount);
-          replicas -= stateCount;
-        }
-      }
-    }
-
-    // get state count for R
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      if ("R".equals(num)) {
-        stateCountMap.put(state, replicas);
-        // should have at most one state using R
-        break;
-      }
-    }
-    return stateCountMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 9699dcb..051a2f3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,114 +20,123 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 /**
  * For partition compute best possible (instance,state) pair based on
  * IdealState,StateModel,LiveInstance
  */
-@Deprecated
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
-  private static final Logger logger = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+  private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    logger.info("START BestPossibleStateCalcStage.process()");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("START BestPossibleStateCalcStage.process()");
+    }
 
-    CurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
 
-    if (currentStateOutput == null || resourceMap == null || cache == null) {
+    if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires CURRENT_STATE|RESOURCES|DataCache");
     }
 
     BestPossibleStateOutput bestPossibleStateOutput =
-        compute(event, resourceMap, currentStateOutput);
+        compute(cluster, event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
     long endTime = System.currentTimeMillis();
-    logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+    }
   }
 
-  private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
-      CurrentStateOutput currentStateOutput) {
-    // for each ideal state
-    // read the state model def
-    // for each resource
-    // get the preference list
-    // for each instanceName check if its alive then assign a state
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+  /**
+   * Fallback for cases when the resource has been dropped, but current state exists
+   * @param cluster cluster snapshot
+   * @param resourceId the resource for which to generate an assignment
+   * @param currentStateOutput full snapshot of the current state
+   * @param stateModelDef state model the resource follows
+   * @return assignment for the dropped resource
+   */
+  private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+      ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
+    ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+    Set<? extends PartitionId> mappedPartitions =
+        currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+    if (mappedPartitions == null) {
+      return partitionMapping;
+    }
+    for (PartitionId partitionId : mappedPartitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partitionId);
+      Map<State, String> upperBounds =
+          ConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
+              cluster.getConfig());
+      partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
+          .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+              stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+              disabledParticipantsForPartition));
+    }
+    return partitionMapping;
+  }
 
+  private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+      Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
     BestPossibleStateOutput output = new BestPossibleStateOutput();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
 
-    for (String resourceName : resourceMap.keySet()) {
-      logger.debug("Processing resource:" + resourceName);
-
-      Resource resource = resourceMap.get(resourceName);
-      // Ideal state may be gone. In that case we need to get the state model name
-      // from the current state
-      IdealState idealState = cache.getIdealState(resourceName);
-
-      if (idealState == null) {
-        // if ideal state is deleted, use an empty one
-        logger.info("resource:" + resourceName + " does not exist anymore");
-        idealState = new IdealState(resourceName);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing resource:" + resourceId);
       }
-
-      Rebalancer rebalancer = null;
-      if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-          && idealState.getRebalancerRef() != null) {
-        String rebalancerClassName = idealState.getRebalancerRef().toString();
-        logger
-            .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
-        try {
-          rebalancer =
-              (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-        } catch (Exception e) {
-          logger.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+      ResourceConfig resourceConfig = resourceMap.get(resourceId);
+      RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+      ResourceAssignment resourceAssignment = null;
+      if (rebalancerConfig != null) {
+        HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+        if (rebalancer != null) {
+          HelixManager manager = event.getAttribute("helixmanager");
+          rebalancer.init(manager);
+          resourceAssignment =
+              rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
         }
       }
-      if (rebalancer == null) {
-        if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
-          rebalancer = new AutoRebalancer();
-        } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-          rebalancer = new SemiAutoRebalancer();
-        } else {
-          rebalancer = new CustomRebalancer();
-        }
+      if (resourceAssignment == null) {
+        RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+        StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+        resourceAssignment =
+            mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
       }
 
-      HelixManager manager = event.getAttribute("helixmanager");
-      rebalancer.init(manager);
-      ResourceAssignment partitionStateAssignment =
-          rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
-      for (Partition partition : resource.getPartitions()) {
-        Map<ParticipantId, State> newStateMap =
-            partitionStateAssignment.getReplicaMap(PartitionId.from(partition.getPartitionName()));
-        output.setParticipantStateMap(resourceName, partition, newStateMap);
-      }
+      output.setResourceAssignment(resourceId, resourceAssignment);
     }
+
     return output;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 362bbb6..afcb6f7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -1,82 +1,49 @@
 package org.apache.helix.controller.stages;
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
 
-@Deprecated
-public class BestPossibleStateOutput {
-  // resource->partition->instance->state
-  Map<String, Map<Partition, Map<String, String>>> _dataMap;
+import com.google.common.collect.Maps;
 
-  public BestPossibleStateOutput() {
-    _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
-  }
+public class BestPossibleStateOutput {
 
-  public void setState(String resourceName, Partition resource,
-      Map<String, String> bestInstanceStateMappingForResource) {
-    if (!_dataMap.containsKey(resourceName)) {
-      _dataMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
-    }
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    map.put(resource, bestInstanceStateMappingForResource);
-  }
+  Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
 
-  public void setParticipantStateMap(String resourceName, Partition partition,
-      Map<ParticipantId, State> bestInstanceStateMappingForResource) {
-    Map<String, String> rawStateMap = new HashMap<String, String>();
-    for (ParticipantId participantId : bestInstanceStateMappingForResource.keySet()) {
-      rawStateMap.put(participantId.stringify(),
-          bestInstanceStateMappingForResource.get(participantId).toString());
-    }
-    setState(resourceName, partition, rawStateMap);
+  public BestPossibleStateOutput() {
+    _resourceAssignmentMap = Maps.newHashMap();
   }
 
-  public Map<String, String> getInstanceStateMap(String resourceName, Partition resource) {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    if (map != null) {
-      return map.get(resource);
-    }
-    return Collections.emptyMap();
+  /**
+   * Set the computed resource assignment for a resource
+   * @param resourceId the resource to set
+   * @param resourceAssignment the computed assignment
+   */
+  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+    _resourceAssignmentMap.put(resourceId, resourceAssignment);
   }
 
-  public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    if (map != null) {
-      return map;
-    }
-    return Collections.emptyMap();
+  /**
+   * Get the resource assignment computed for a resource
+   * @param resourceId resource to look up
+   * @return ResourceAssignment computed by the best possible state calculation
+   */
+  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+    return _resourceAssignmentMap.get(resourceId);
   }
 
-  public Map<String, Map<Partition, Map<String, String>>> getStateMap() {
-    return _dataMap;
+  /**
+   * Get all of the resources currently assigned
+   * @return set of assigned resource ids
+   */
+  public Set<ResourceId> getAssignedResources() {
+    return _resourceAssignmentMap.keySet();
   }
 
   @Override
   public String toString() {
-    return _dataMap.toString();
+    return _resourceAssignmentMap.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 64e881c..532ecb5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -23,10 +23,12 @@ import java.util.Map;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.api.Cluster;
 import org.apache.helix.api.HelixVersion;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
 
 /**
@@ -38,16 +40,17 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (manager == null || cache == null) {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    if (manager == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires HelixManager | DataCache");
     }
 
     HelixManagerProperties properties = manager.getProperties();
-    Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-    for (LiveInstance liveInstance : liveInstanceMap.values()) {
-      HelixVersion version = liveInstance.getTypedHelixVersion();
+    // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    for (Participant liveParticipant : liveParticipants.values()) {
+      HelixVersion version = liveParticipant.getRunningInstance().getVersion();
       String participantVersion = (version != null) ? version.toString() : null;
       if (!properties.isParticipantCompatible(participantVersion)) {
         String errorMsg =
@@ -55,7 +58,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
                 + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
                 + ", minimumSupportedParticipantVersion: "
                 + properties.getProperty("minimum_supported_version.participant")
-                + ", participant: " + liveInstance.getInstanceName() + ", participantVersion: "
+                + ", participant: " + liveParticipant.getId() + ", participantVersion: "
                 + participantVersion;
         LOG.error(errorMsg);
         throw new StageException(errorMsg);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 7036512..5730289 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -22,59 +22,68 @@ package org.apache.helix.controller.stages;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
  * sessionId from LiveInstance Get Partition,State for all the resources computed in
  * previous State [ResourceComputationStage]
  */
-@Deprecated
 public class CurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
-    if (cache == null || resourceMap == null) {
+    if (cluster == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires DataCache|RESOURCE");
     }
 
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
 
-    for (LiveInstance instance : liveInstances.values()) {
-      String instanceName = instance.getInstanceName();
-      Map<String, Message> instanceMessages = cache.getMessages(instanceName);
-      for (Message message : instanceMessages.values()) {
+    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+      ParticipantId participantId = liveParticipant.getId();
+
+      // add pending messages
+      Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
+      for (Message message : instanceMsgs.values()) {
         if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
           continue;
         }
-        if (!instance.getTypedSessionId().equals(message.getTypedTgtSessionId())) {
+
+        if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
           continue;
         }
+
         ResourceId resourceId = message.getResourceId();
-        Resource resource = resourceMap.get(resourceId.stringify());
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }
 
         if (!message.getBatchMessageMode()) {
           PartitionId partitionId = message.getPartitionId();
-          Partition partition = resource.getPartition(partitionId.stringify());
+          Partition partition = resource.getSubUnit(partitionId);
           if (partition != null) {
-            currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
-                message.getTypedToState().toString());
+            currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+                message.getTypedToState());
           } else {
             // log
           }
@@ -82,10 +91,10 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           List<PartitionId> partitionNames = message.getPartitionIds();
           if (!partitionNames.isEmpty()) {
             for (PartitionId partitionId : partitionNames) {
-              Partition partition = resource.getPartition(partitionId.stringify());
+              Partition partition = resource.getSubUnit(partitionId);
               if (partition != null) {
-                currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
-                    message.getTypedToState().toString());
+                currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+                    message.getTypedToState());
               } else {
                 // log
               }
@@ -93,43 +102,41 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           }
         }
       }
-    }
-    for (LiveInstance instance : liveInstances.values()) {
-      String instanceName = instance.getInstanceName();
-
-      String clientSessionId = instance.getTypedSessionId().stringify();
-      Map<String, CurrentState> currentStateMap =
-          cache.getCurrentState(instanceName, clientSessionId);
-      for (CurrentState currentState : currentStateMap.values()) {
 
-        if (!instance.getTypedSessionId().equals(currentState.getTypedSessionId())) {
+      // add current state
+      SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
+      Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
+      for (CurrentState curState : curStateMap.values()) {
+        if (!sessionId.equals(curState.getTypedSessionId())) {
           continue;
         }
-        String resourceName = currentState.getResourceName();
-        String stateModelDefName = currentState.getStateModelDefRef();
-        Resource resource = resourceMap.get(resourceName);
+
+        ResourceId resourceId = curState.getResourceId();
+        StateModelDefId stateModelDefId = curState.getStateModelDefId();
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }
-        if (stateModelDefName != null) {
-          currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
+
+        if (stateModelDefId != null) {
+          currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
         }
 
-        currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
+        currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
 
-        Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
-        for (String partitionName : partitionStateMap.keySet()) {
-          Partition partition = resource.getPartition(partitionName);
+        Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
+        for (PartitionId partitionId : partitionStateMap.keySet()) {
+          Partition partition = resource.getSubUnit(partitionId);
           if (partition != null) {
-            currentStateOutput.setCurrentState(resourceName, partition, instanceName,
-                currentState.getState(partitionName));
-
+            currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+                curState.getState(partitionId));
           } else {
             // log
           }
         }
       }
     }
+
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 3c3a9d9..55a5e54 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -34,33 +34,39 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.SchedulerTaskConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.StatusUpdate;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class ExternalViewComputeStage extends AbstractBaseStage {
-  private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+  private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    log.info("START ExternalViewComputeStage.process()");
+    LOG.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
 
-    if (manager == null || resourceMap == null || cache == null) {
+    if (manager == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires ClusterManager|RESOURCES|DataCache");
     }
@@ -68,58 +74,64 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
 
-    CurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
 
     List<ExternalView> newExtViews = new ArrayList<ExternalView>();
     List<PropertyKey> keys = new ArrayList<PropertyKey>();
 
+    // TODO use external-view accessor
     Map<String, ExternalView> curExtViews =
         dataAccessor.getChildValuesMap(keyBuilder.externalViews());
 
-    for (String resourceName : resourceMap.keySet()) {
-      ExternalView view = new ExternalView(resourceName);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ExternalView view = new ExternalView(resourceId.stringify());
       // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
       // if resource ideal state has bucket size, set it
       // otherwise resource has been dropped, use bucket size from current state instead
-      Resource resource = resourceMap.get(resourceName);
+      ResourceConfig resource = resourceMap.get(resourceId);
+      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+      SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
+
       if (resource.getBucketSize() > 0) {
         view.setBucketSize(resource.getBucketSize());
       } else {
-        view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
+        view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
       }
-
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+        Map<ParticipantId, State> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceId, partitionId);
         if (currentStateMap != null && currentStateMap.size() > 0) {
           // Set<String> disabledInstances
           // = cache.getDisabledInstancesForResource(resource.toString());
-          for (String instance : currentStateMap.keySet()) {
+          for (ParticipantId participantId : currentStateMap.keySet()) {
             // if (!disabledInstances.contains(instance))
             // {
-            view.setState(partition.getPartitionName(), instance, currentStateMap.get(instance));
+            view.setState(partitionId.stringify(), participantId.stringify(),
+                currentStateMap.get(participantId).toString());
             // }
           }
         }
       }
+
+      // TODO fix this
       // Update cluster status monitor mbean
-      ClusterStatusMonitor clusterStatusMonitor =
-          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      IdealState idealState = cache._idealStateMap.get(view.getResourceName());
-      if (idealState != null) {
-        if (clusterStatusMonitor != null
-            && !idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          clusterStatusMonitor.onExternalViewChange(view,
-              cache._idealStateMap.get(view.getResourceName()));
-        }
-      }
+      // ClusterStatusMonitor clusterStatusMonitor =
+      // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+      // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+      // if (idealState != null) {
+      // if (clusterStatusMonitor != null
+      // && !idealState.getStateModelDefRef().equalsIgnoreCase(
+      // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+      // clusterStatusMonitor.onExternalViewChange(view,
+      // cache._idealStateMap.get(view.getResourceName()));
+      // }
+      // }
 
       // compare the new external view with current one, set only on different
-      ExternalView curExtView = curExtViews.get(resourceName);
+      ExternalView curExtView = curExtViews.get(resourceId.stringify());
       if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) {
-        keys.add(keyBuilder.externalView(resourceName));
+        keys.add(keyBuilder.externalView(resourceId.stringify()));
         newExtViews.add(view);
 
         // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which
@@ -127,10 +139,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         // partitions are finished (COMPLETED or ERROR), update the status update of the original
         // scheduler
         // message, and then remove the partitions from the ideal state
-        if (idealState != null
-            && idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          updateScheduledTaskStatus(view, manager, idealState);
+        RebalancerContext rebalancerContext =
+            (rebalancerConfig != null) ? rebalancerConfig
+                .getRebalancerContext(RebalancerContext.class) : null;
+        if (rebalancerContext != null
+            && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+                StateModelDefId.SchedulerTaskQueue)) {
+          updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }
       }
     }
@@ -144,18 +159,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
 
     // remove dead external-views
     for (String resourceName : curExtViews.keySet()) {
-      if (!resourceMap.keySet().contains(resourceName)) {
+      if (!resourceMap.containsKey(ResourceId.from(resourceName))) {
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
       }
     }
 
     long endTime = System.currentTimeMillis();
-    log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+    LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
   }
 
-  private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
-      IdealState taskQueueIdealState) {
+  // TODO fix it
+  private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
+      HelixManager manager, SchedulerTaskConfig schedulerTaskConfig) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
     ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
 
     // Place holder for finished partitions
@@ -166,23 +184,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     Map<String, Map<String, String>> controllerMsgUpdates =
         new HashMap<String, Map<String, String>>();
 
-    Builder keyBuilder = accessor.keyBuilder();
-
     for (String taskPartitionName : ev.getPartitionSet()) {
       for (String taskState : ev.getStateMap(taskPartitionName).values()) {
         if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
             || taskState.equalsIgnoreCase("COMPLETED")) {
-          log.info(taskPartitionName + " finished as " + taskState);
-          finishedTasks.getListFields().put(taskPartitionName, emptyList);
-          finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+          LOG.info(taskPartitionName + " finished as " + taskState);
+          finishedTasks.setListField(taskPartitionName, emptyList);
+          finishedTasks.setMapField(taskPartitionName, emptyMap);
 
           // Update original scheduler message status update
-          if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) {
-            String controllerMsgId =
-                taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                    .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+          Message innerMessage =
+              schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
+          if (innerMessage != null) {
+            String controllerMsgId = innerMessage.getControllerMessageId();
             if (controllerMsgId != null) {
-              log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+              LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
               if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
                 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
               }
@@ -193,16 +209,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       }
     }
     // fill the controllerMsgIdCountMap
-    for (String taskId : taskQueueIdealState.getPartitionSet()) {
-      String controllerMsgId =
-          taskQueueIdealState.getRecord().getMapField(taskId)
-              .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+    for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
+      Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
+      String controllerMsgId = innerMessage.getControllerMessageId();
+
       if (controllerMsgId != null) {
-        if (!controllerMsgIdCountMap.containsKey(controllerMsgId)) {
-          controllerMsgIdCountMap.put(controllerMsgId, 0);
+        Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
+        if (curCnt == null) {
+          curCnt = 0;
         }
-        controllerMsgIdCountMap.put(controllerMsgId,
-            (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+        controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
       }
     }
 
@@ -212,18 +228,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
             keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
         StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
         for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
+          Message innerMessage =
+              schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
+
           Map<String, String> result = new HashMap<String, String>();
           result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
           controllerStatusUpdate.getRecord().setMapField(
-              "MessageResult "
-                  + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                      .get(Message.Attributes.TGT_NAME.toString())
-                  + " "
-                  + taskPartitionName
-                  + " "
-                  + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                      .get(Message.Attributes.MSG_ID.toString()), result);
+              "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
+                  + innerMessage.getMessageId(), result);
         }
+
         // All done for the scheduled tasks that came from controllerMsgId, add summary for it
         if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
             controllerMsgId).intValue()) {
@@ -255,12 +269,12 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
       List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
       deltaList.add(znDelta);
-      IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+      IdealState delta = new IdealState(resourceId);
       delta.setDeltaList(deltaList);
 
       // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
       keyBuilder = accessor.keyBuilder();
-      accessor.updateProperty(keyBuilder.idealState(taskQueueIdealState.getResourceName()), delta);
+      accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
deleted file mode 100644
index 3056cd5..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-@Deprecated
-public class MessageGenerationPhase extends AbstractBaseStage {
-  private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
-        || bestPossibleStateOutput == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
-    }
-
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    Map<String, String> sessionIdMap = new HashMap<String, String>();
-
-    for (LiveInstance liveInstance : liveInstances.values()) {
-      sessionIdMap
-          .put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId().stringify());
-    }
-    MessageGenerationOutput output = new MessageGenerationOutput();
-
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-      int bucketSize = resource.getBucketSize();
-
-      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> instanceStateMap =
-            bestPossibleStateOutput.getInstanceStateMap(resourceName, partition);
-
-        // we should generate message based on the desired-state priority
-        // so keep generated messages in a temp map keyed by state
-        // desired-state->list of generated-messages
-        Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
-
-        for (String instanceName : instanceStateMap.keySet()) {
-          String desiredState = instanceStateMap.get(instanceName);
-
-          String currentState =
-              currentStateOutput.getCurrentState(resourceName, partition, instanceName);
-          if (currentState == null) {
-            currentState = stateModelDef.getInitialState();
-          }
-
-          if (desiredState.equalsIgnoreCase(currentState)) {
-            continue;
-          }
-
-          String pendingState =
-              currentStateOutput.getPendingState(resourceName, partition, instanceName);
-
-          String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-          if (nextState == null) {
-            logger.error("Unable to find a next state for partition: "
-                + partition.getPartitionName() + " from stateModelDefinition"
-                + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
-            continue;
-          }
-
-          if (pendingState != null) {
-            if (nextState.equalsIgnoreCase(pendingState)) {
-              logger.debug("Message already exists for " + instanceName + " to transit "
-                  + partition.getPartitionName() + " from " + currentState + " to " + nextState);
-            } else if (currentState.equalsIgnoreCase(pendingState)) {
-              logger.info("Message hasn't been removed for " + instanceName + " to transit"
-                  + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
-                  + desiredState);
-            } else {
-              logger.info("IdealState changed before state transition completes for "
-                  + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
-                  + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
-            }
-          } else {
-            Message message =
-                createMessage(manager, resourceName, partition.getPartitionName(), instanceName,
-                    currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
-                    resource.getStateModelFactoryname(), bucketSize);
-            IdealState idealState = cache.getIdealState(resourceName);
-            if (idealState != null
-                && idealState.getStateModelDefRef().equalsIgnoreCase(
-                    DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                    idealState.getRecord().getMapField(partition.getPartitionName()));
-              }
-            }
-            // Set timeout of needed
-            String stateTransition =
-                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-            if (idealState != null) {
-              String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
-              if (timeOutStr == null
-                  && idealState.getStateModelDefRef().equalsIgnoreCase(
-                      DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-                // scheduled task queue
-                if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                  timeOutStr =
-                      idealState.getRecord().getMapField(partition.getPartitionName())
-                          .get(Message.Attributes.TIMEOUT.toString());
-                }
-              }
-              if (timeOutStr != null) {
-                try {
-                  int timeout = Integer.parseInt(timeOutStr);
-                  if (timeout > 0) {
-                    message.setExecutionTimeout(timeout);
-                  }
-                } catch (Exception e) {
-                  logger.error("", e);
-                }
-              }
-            }
-            message.getRecord().setSimpleField("ClusterEventName", event.getName());
-            // output.addMessage(resourceName, partition, message);
-            if (!messageMap.containsKey(desiredState)) {
-              messageMap.put(desiredState, new ArrayList<Message>());
-            }
-            messageMap.get(desiredState).add(message);
-          }
-        }
-
-        // add generated messages to output according to state priority
-        List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-        for (String state : statesPriorityList) {
-          if (messageMap.containsKey(state)) {
-            for (Message message : messageMap.get(state)) {
-              output.addMessage(resourceName, partition, message);
-            }
-          }
-        }
-
-      } // end of for-each-partition
-    }
-    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
-  }
-
-  private Message createMessage(HelixManager manager, String resourceName, String partitionName,
-      String instanceName, String currentState, String nextState, String sessionId,
-      String stateModelDefName, String stateModelFactoryName, int bucketSize) {
-    MessageId uuid = MessageId.from(UUID.randomUUID().toString());
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionId(PartitionId.from(partitionName));
-    message.setResourceId(ResourceId.from(resourceName));
-    message.setFromState(State.from(currentState));
-    message.setToState(State.from(nextState));
-    message.setTgtSessionId(SessionId.from(sessionId));
-    message.setSrcSessionId(SessionId.from(manager.getSessionId()));
-    message.setStateModelDef(StateModelDefId.from(stateModelDefName));
-    message.setStateModelFactoryName(stateModelFactoryName);
-    message.setBucketSize(bucketSize);
-
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
new file mode 100644
index 0000000..d6fe8c3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -0,0 +1,213 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.SchedulerTaskConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class MessageGenerationStage extends AbstractBaseStage {
+  private static Logger LOG = Logger.getLogger(MessageGenerationStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager manager = event.getAttribute("helixmanager");
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    ResourceCurrentState currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+        || bestPossibleStateOutput == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+    }
+
+    MessageOutput output = new MessageOutput();
+
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ResourceConfig resourceConfig = resourceMap.get(resourceId);
+      int bucketSize = resourceConfig.getBucketSize();
+
+      RebalancerContext rebalancerCtx =
+          resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
+
+      ResourceAssignment resourceAssignment =
+          bestPossibleStateOutput.getResourceAssignment(resourceId);
+      for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+        Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
+
+        // we should generate message based on the desired-state priority
+        // so keep generated messages in a temp map keyed by state
+        // desired-state->list of generated-messages
+        Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+        for (ParticipantId participantId : instanceStateMap.keySet()) {
+          State desiredState = instanceStateMap.get(participantId);
+
+          State currentState =
+              currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
+          if (currentState == null) {
+            currentState = stateModelDef.getTypedInitialState();
+          }
+
+          if (desiredState.equals(currentState)) {
+            continue;
+          }
+
+          State pendingState =
+              currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
+
+          // TODO fix it
+          State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+          if (nextState == null) {
+            LOG.error("Unable to find a next state for partition: " + subUnitId
+                + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
+                + " to:" + desiredState);
+            continue;
+          }
+
+          if (pendingState != null) {
+            if (nextState.equals(pendingState)) {
+              LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
+                  + " from " + currentState + " to " + nextState);
+            } else if (currentState.equals(pendingState)) {
+              LOG.info("Message hasn't been removed for " + participantId + " to transit"
+                  + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
+            } else {
+              LOG.info("IdealState changed before state transition completes for " + subUnitId
+                  + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
+                  + currentState + ", nextState: " + nextState);
+            }
+          } else {
+            // TODO check if instance is alive
+            SessionId sessionId =
+                cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+                    .getSessionId();
+            RebalancerContext rebalancerContext =
+                resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+            Message message =
+                createMessage(manager, resourceId, subUnitId, participantId, currentState,
+                    nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
+                    rebalancerContext.getStateModelFactoryId(), bucketSize);
+
+            // TODO refactor get/set timeout/inner-message
+            if (rebalancerContext != null
+                && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+                    StateModelDefId.SchedulerTaskQueue)) {
+              if (resourceConfig.getSubUnitMap().size() > 0) {
+                // TODO refactor it -- we need a way to read in scheduler tasks a priori
+                Message innerMsg =
+                    resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
+                if (innerMsg != null) {
+                  message.setInnerMessage(innerMsg);
+                }
+              }
+            }
+
+            // Set timeout if needed
+            String stateTransition =
+                String.format("%s-%s_%s", currentState, nextState,
+                    Message.Attributes.TIMEOUT.name());
+            SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
+            if (schedulerTaskConfig != null) {
+              int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
+              if (timeout > 0) {
+                message.setExecutionTimeout(timeout);
+              }
+            }
+            message.setClusterEvent(event);
+
+            if (!messageMap.containsKey(desiredState)) {
+              messageMap.put(desiredState, new ArrayList<Message>());
+            }
+            messageMap.get(desiredState).add(message);
+          }
+        }
+
+        // add generated messages to output according to state priority
+        List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+        for (State state : statesPriorityList) {
+          if (messageMap.containsKey(state)) {
+            for (Message message : messageMap.get(state)) {
+              output.addMessage(resourceId, subUnitId, message);
+            }
+          }
+        }
+
+      } // end of for-each-partition
+    }
+    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+    // System.out.println("output: " + output);
+  }
+
+  private Message createMessage(HelixManager manager, ResourceId resourceId,
+      PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+      SessionId participantSessionId, StateModelDefId stateModelDefId,
+      StateModelFactoryId stateModelFactoryId, int bucketSize) {
+    MessageId uuid = MessageId.from(UUID.randomUUID().toString());
+    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+    message.setSrcName(manager.getInstanceName());
+    message.setTgtName(participantId.stringify());
+    message.setMsgState(MessageState.NEW);
+    message.setPartitionId(partitionId);
+    message.setResourceId(resourceId);
+    message.setFromState(currentState);
+    message.setToState(nextState);
+    message.setTgtSessionId(participantSessionId);
+    message.setSrcSessionId(SessionId.from(manager.getSessionId()));
+    message.setStateModelDef(stateModelDefId);
+    message.setStateModelFactoryId(stateModelFactoryId);
+    message.setBucketSize(bucketSize);
+
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
new file mode 100644
index 0000000..9c8c154
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.Message;
+
+public class MessageOutput {
+
+  private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
+
+  public MessageOutput() {
+    _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
+
+  }
+
+  public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
+    if (!_messagesMap.containsKey(resourceId)) {
+      _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+    }
+    if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
+      _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
+
+    }
+    _messagesMap.get(resourceId).get(partitionId).add(message);
+
+  }
+
+  public void setMessages(ResourceId resourceId, PartitionId partitionId,
+      List<Message> selectedMessages) {
+    if (!_messagesMap.containsKey(resourceId)) {
+      _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+    }
+    _messagesMap.get(resourceId).put(partitionId, selectedMessages);
+
+  }
+
+  public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
+    Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
+    if (map != null) {
+      return map.get(partitionId);
+    }
+    return Collections.emptyList();
+
+  }
+
+  public Map<PartitionId, List<Message>> getMessages(ResourceId resourceId) {
+    return _messagesMap.get(resourceId);
+  }
+
+  @Override
+  public String toString() {
+    return _messagesMap.toString();
+  }
+}


Mime
View raw message