helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [46/53] [abbrv] [HELIX-209] Shuffling around rebalancer code to allow for compatibility
Date Thu, 07 Nov 2013 01:19:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 1a3f37b..4a3fe28 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -26,18 +26,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class MessageSelectionStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
 
@@ -73,41 +80,54 @@ public class MessageSelectionStage extends AbstractBaseStage {
     public int getUpperBound() {
       return upper;
     }
+
+    @Override
+    public String toString() {
+      return String.format("%d-%d", lower, upper);
+    }
   }
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    CurrentStateOutput currentStateOutput =
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    MessageGenerationOutput messageGenOutput =
-        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
-    if (cache == null || resourceMap == null || currentStateOutput == null
+    MessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+    if (cluster == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
     }
 
-    MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+    MessageOutput output = new MessageOutput();
 
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ResourceConfig resource = resourceMap.get(resourceId);
+      StateModelDefinition stateModelDef =
+          stateModelDefMap.get(resource.getRebalancerConfig()
+              .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
 
+      // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
-      IdealState idealState = cache.getIdealState(resourceName);
-      Map<String, Bounds> stateConstraints =
-          computeStateConstraints(stateModelDef, idealState, cache);
+      Resource configResource = cluster.getResource(resourceId);
+
+      // if configResource == null, the resource has been dropped
+      Map<State, Bounds> stateConstraints =
+          computeStateConstraints(stateModelDef,
+              configResource == null ? null : configResource.getRebalancerConfig(), cluster);
 
-      for (Partition partition : resource.getPartitions()) {
-        List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
+      // TODO fix it
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+        List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
         List<Message> selectedMessages =
-            selectMessages(cache.getLiveInstances(),
-                currentStateOutput.getCurrentStateMap(resourceName, partition),
-                currentStateOutput.getPendingStateMap(resourceName, partition), messages,
-                stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
-        output.addMessages(resourceName, partition, selectedMessages);
+            selectMessages(cluster.getLiveParticipantMap(),
+                currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+                currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+                stateConstraints, stateTransitionPriorities, stateModelDef.getTypedInitialState());
+        output.setMessages(resourceId, partitionId, selectedMessages);
       }
     }
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
@@ -132,22 +152,22 @@ public class MessageSelectionStage extends AbstractBaseStage {
    *          : FROME_STATE-TO_STATE -> priority
    * @return: selected messages
    */
-  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
-      Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
-      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
-      String initialState) {
+  List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
+      Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
+      List<Message> messages, Map<State, Bounds> stateConstraints,
+      final Map<String, Integer> stateTransitionPriorities, State initialState) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
 
     List<Message> selectedMessages = new ArrayList<Message>();
-    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+    Map<State, Bounds> bounds = new HashMap<State, Bounds>();
 
     // count currentState, if no currentState, count as in initialState
-    for (String instance : liveInstances.keySet()) {
-      String state = initialState;
-      if (currentStates.containsKey(instance)) {
-        state = currentStates.get(instance);
+    for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
+      State state = initialState;
+      if (currentStates.containsKey(liveParticipantId)) {
+        state = currentStates.get(liveParticipantId);
       }
 
       if (!bounds.containsKey(state)) {
@@ -158,8 +178,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
     }
 
     // count pendingStates
-    for (String instance : pendingStates.keySet()) {
-      String state = pendingStates.get(instance);
+    for (ParticipantId participantId : pendingStates.keySet()) {
+      State state = pendingStates.get(participantId);
       if (!bounds.containsKey(state)) {
         bounds.put(state, new Bounds(0, 0));
       }
@@ -173,7 +193,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
     for (Message message : messages) {
       State fromState = message.getTypedFromState();
       State toState = message.getTypedToState();
-      String transition = fromState + "-" + toState;
+      String transition = fromState.toString() + "-" + toState.toString();
       int priority = Integer.MAX_VALUE;
 
       if (stateTransitionPriorities.containsKey(transition)) {
@@ -198,7 +218,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
         }
 
         if (!bounds.containsKey(toState)) {
-          bounds.put(toState.toString(), new Bounds(0, 0));
+          bounds.put(toState, new Bounds(0, 0));
         }
 
         // check lower bound of fromState
@@ -236,22 +256,35 @@ public class MessageSelectionStage extends AbstractBaseStage {
    * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
    * beginning and compute the stateConstraint instance once and re use at other places.
    * Each IdealState must have a constraint object associated with it
+   * @param stateModelDefinition
+   * @param rebalancerConfig if rebalancerConfig == null, we can't evaluate R thus no constraints
+   * @param cluster
+   * @return
    */
-  private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
-      IdealState idealState, ClusterDataCache cache) {
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-
-    List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
-    for (String state : statePriorityList) {
-      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+  private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+      RebalancerConfig rebalancerConfig, Cluster cluster) {
+    ReplicatedRebalancerContext context =
+        (rebalancerConfig != null) ? rebalancerConfig
+            .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+
+    List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
+    for (State state : statePriorityList) {
+      String numInstancesPerState =
+          cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
+              stateModelDefinition.getStateModelDefId(), state);
       int max = -1;
       if ("N".equals(numInstancesPerState)) {
-        max = cache.getLiveInstances().size();
+        max = cluster.getLiveParticipantMap().size();
       } else if ("R".equals(numInstancesPerState)) {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
-        if (idealState != null) {
-          max = cache.getReplicas(idealState.getResourceName());
+        if (context != null) {
+          if (context.anyLiveParticipant()) {
+            max = cluster.getLiveParticipantMap().size();
+          } else {
+            max = context.getReplicaCount();
+          }
         }
       } else {
         try {
@@ -274,7 +307,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
   // so that behavior is consistent
   private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
-    List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityStringList();
+    List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityList();
     for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
       stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
deleted file mode 100644
index 54ab384..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-
-@Deprecated
-public class MessageSelectionStageOutput {
-  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
-  public MessageSelectionStageOutput() {
-    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-  }
-
-  public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) {
-    if (!_messagesMap.containsKey(resourceName)) {
-      _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
-    }
-    _messagesMap.get(resourceName).put(partition, selectedMessages);
-
-  }
-
-  public List<Message> getMessages(String resourceName, Partition partition) {
-    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null) {
-      return map.get(partition);
-    }
-    return Collections.emptyList();
-
-  }
-
-  @Override
-  public String toString() {
-    return _messagesMap.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 62fbafe..a7b75a3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -26,6 +26,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConstraints;
@@ -34,11 +40,8 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class MessageThrottleStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(MessageThrottleStage.class.getName());
 
@@ -113,39 +116,43 @@ public class MessageThrottleStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    MessageSelectionStageOutput msgSelectionOutput =
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    MessageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
-    if (cache == null || resourceMap == null || msgSelectionOutput == null) {
+    if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
       throw new StageException("Missing attributes in event: " + event
           + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
     }
 
-    MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+    MessageOutput output = new MessageOutput();
 
-    ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+    // TODO fix it
+    ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
     Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
 
     if (constraint != null) {
       // go through all pending messages, they should be counted but not throttled
-      for (String instance : cache.getLiveInstances().keySet()) {
-        throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
-            .values()), false);
+      for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
+        Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
+        throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
+            .getMessageMap().values()), false);
       }
     }
 
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions()) {
-        List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ResourceConfig resource = resourceMap.get(resourceId);
+      // TODO fix it
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+        List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
         if (constraint != null && messages != null && messages.size() > 0) {
           messages = throttle(throttleCounterMap, constraint, messages, true);
         }
-        output.addMessages(resourceName, partition, messages);
+        output.setMessages(resourceId, partitionId, messages);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
deleted file mode 100644
index 5983eff..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-
-@Deprecated
-public class MessageThrottleStageOutput {
-  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
-  public MessageThrottleStageOutput() {
-    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-  }
-
-  public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) {
-    if (!_messagesMap.containsKey(resourceName)) {
-      _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
-    }
-    _messagesMap.get(resourceName).put(partition, selectedMessages);
-
-  }
-
-  public List<Message> getMessages(String resourceName, Partition partition) {
-    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null) {
-      return map.get(partition);
-    }
-    return Collections.emptyList();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
deleted file mode 100644
index 8b56bec..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.Rebalancer;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * For partition compute best possible (instance,state) pair based on
- * IdealState,StateModel,LiveInstance
- */
-public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("START BestPossibleStateCalcStage.process()");
-    }
-
-    ResourceCurrentState currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-
-    if (currentStateOutput == null || resourceMap == null || cluster == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires CURRENT_STATE|RESOURCES|DataCache");
-    }
-
-    NewBestPossibleStateOutput bestPossibleStateOutput =
-        compute(cluster, event, resourceMap, currentStateOutput);
-    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
-
-    long endTime = System.currentTimeMillis();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
-    }
-  }
-
-  /**
-   * Fallback for cases when the resource has been dropped, but current state exists
-   * @param cluster cluster snapshot
-   * @param resourceId the resource for which to generate an assignment
-   * @param currentStateOutput full snapshot of the current state
-   * @param stateModelDef state model the resource follows
-   * @return assignment for the dropped resource
-   */
-  private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
-      ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
-    ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
-    Set<? extends PartitionId> mappedPartitions =
-        currentStateOutput.getCurrentStateMappedPartitions(resourceId);
-    if (mappedPartitions == null) {
-      return partitionMapping;
-    }
-    for (PartitionId partitionId : mappedPartitions) {
-      Set<ParticipantId> disabledParticipantsForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partitionId);
-      Map<State, String> upperBounds =
-          NewConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
-              cluster.getConfig());
-      partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
-          .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
-              stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-              disabledParticipantsForPartition));
-    }
-    return partitionMapping;
-  }
-
-  private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
-      Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
-    NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
-    Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
-
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing resource:" + resourceId);
-      }
-      ResourceConfig resourceConfig = resourceMap.get(resourceId);
-      RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
-      ResourceAssignment resourceAssignment = null;
-      if (rebalancerConfig != null) {
-        Rebalancer rebalancer = rebalancerConfig.getRebalancer();
-        if (rebalancer != null) {
-          HelixManager manager = event.getAttribute("helixmanager");
-          rebalancer.init(manager);
-          resourceAssignment =
-              rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
-        }
-      }
-      if (resourceAssignment == null) {
-        RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
-        StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
-        resourceAssignment =
-            mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
-      }
-
-      output.setResourceAssignment(resourceId, resourceAssignment);
-    }
-
-    return output;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
deleted file mode 100644
index 7720143..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.helix.controller.stages;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.collect.Maps;
-
-public class NewBestPossibleStateOutput {
-
-  Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
-
-  public NewBestPossibleStateOutput() {
-    _resourceAssignmentMap = Maps.newHashMap();
-  }
-
-  /**
-   * Set the computed resource assignment for a resource
-   * @param resourceId the resource to set
-   * @param resourceAssignment the computed assignment
-   */
-  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
-    _resourceAssignmentMap.put(resourceId, resourceAssignment);
-  }
-
-  /**
-   * Get the resource assignment computed for a resource
-   * @param resourceId resource to look up
-   * @return ResourceAssignment computed by the best possible state calculation
-   */
-  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
-    return _resourceAssignmentMap.get(resourceId);
-  }
-
-  /**
-   * Get all of the resources currently assigned
-   * @return set of assigned resource ids
-   */
-  public Set<ResourceId> getAssignedResources() {
-    return _resourceAssignmentMap.keySet();
-  }
-
-  @Override
-  public String toString() {
-    return _resourceAssignmentMap.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
deleted file mode 100644
index ea1a507..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.HelixVersion;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.log4j.Logger;
-
-/**
- * controller checks if participant version is compatible
- */
-public class NewCompatibilityCheckStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewCompatibilityCheckStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    HelixManager manager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    if (manager == null || cluster == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager | DataCache");
-    }
-
-    HelixManagerProperties properties = manager.getProperties();
-    // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
-    for (Participant liveParticipant : liveParticipants.values()) {
-      HelixVersion version = liveParticipant.getRunningInstance().getVersion();
-      String participantVersion = (version != null) ? version.toString() : null;
-      if (!properties.isParticipantCompatible(participantVersion)) {
-        String errorMsg =
-            "incompatible participant. pipeline will not continue. " + "controller: "
-                + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
-                + ", minimumSupportedParticipantVersion: "
-                + properties.getProperty("minimum_supported_version.participant")
-                + ", participant: " + liveParticipant.getId() + ", participantVersion: "
-                + participantVersion;
-        LOG.error(errorMsg);
-        throw new StageException(errorMsg);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
deleted file mode 100644
index f7f2a5f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageType;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- */
-public class NewCurrentStateComputationStage extends AbstractBaseStage {
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-
-    if (cluster == null || resourceMap == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCE");
-    }
-
-    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
-
-    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
-      ParticipantId participantId = liveParticipant.getId();
-
-      // add pending messages
-      Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
-      for (Message message : instanceMsgs.values()) {
-        if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
-          continue;
-        }
-
-        if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
-          continue;
-        }
-
-        ResourceId resourceId = message.getResourceId();
-        ResourceConfig resource = resourceMap.get(resourceId);
-        if (resource == null) {
-          continue;
-        }
-
-        if (!message.getBatchMessageMode()) {
-          PartitionId partitionId = message.getPartitionId();
-          Partition partition = resource.getSubUnit(partitionId);
-          if (partition != null) {
-            currentStateOutput.setPendingState(resourceId, partitionId, participantId,
-                message.getTypedToState());
-          } else {
-            // log
-          }
-        } else {
-          List<PartitionId> partitionNames = message.getPartitionIds();
-          if (!partitionNames.isEmpty()) {
-            for (PartitionId partitionId : partitionNames) {
-              Partition partition = resource.getSubUnit(partitionId);
-              if (partition != null) {
-                currentStateOutput.setPendingState(resourceId, partitionId, participantId,
-                    message.getTypedToState());
-              } else {
-                // log
-              }
-            }
-          }
-        }
-      }
-
-      // add current state
-      SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
-      Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
-      for (CurrentState curState : curStateMap.values()) {
-        if (!sessionId.equals(curState.getTypedSessionId())) {
-          continue;
-        }
-
-        ResourceId resourceId = curState.getResourceId();
-        StateModelDefId stateModelDefId = curState.getStateModelDefId();
-        ResourceConfig resource = resourceMap.get(resourceId);
-        if (resource == null) {
-          continue;
-        }
-
-        if (stateModelDefId != null) {
-          currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
-        }
-
-        currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
-
-        Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
-        for (PartitionId partitionId : partitionStateMap.keySet()) {
-          Partition partition = resource.getSubUnit(partitionId);
-          if (partition != null) {
-            currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
-                curState.getState(partitionId));
-          } else {
-            // log
-          }
-        }
-      }
-    }
-
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
deleted file mode 100644
index d67931d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordDelta;
-import org.apache.helix.ZNRecordDelta.MergeOperation;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.SchedulerTaskConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StatusUpdate;
-import org.apache.log4j.Logger;
-
-public class NewExternalViewComputeStage extends AbstractBaseStage {
-  private static Logger LOG = Logger.getLogger(NewExternalViewComputeStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    LOG.info("START ExternalViewComputeStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-
-    if (manager == null || resourceMap == null || cluster == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires ClusterManager|RESOURCES|DataCache");
-    }
-
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
-
-    ResourceCurrentState currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
-    List<ExternalView> newExtViews = new ArrayList<ExternalView>();
-    List<PropertyKey> keys = new ArrayList<PropertyKey>();
-
-    // TODO use external-view accessor
-    Map<String, ExternalView> curExtViews =
-        dataAccessor.getChildValuesMap(keyBuilder.externalViews());
-
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ExternalView view = new ExternalView(resourceId.stringify());
-      // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
-      // if resource ideal state has bucket size, set it
-      // otherwise resource has been dropped, use bucket size from current state instead
-      ResourceConfig resource = resourceMap.get(resourceId);
-      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
-      SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
-
-      if (resource.getBucketSize() > 0) {
-        view.setBucketSize(resource.getBucketSize());
-      } else {
-        view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
-      }
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        Map<ParticipantId, State> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceId, partitionId);
-        if (currentStateMap != null && currentStateMap.size() > 0) {
-          // Set<String> disabledInstances
-          // = cache.getDisabledInstancesForResource(resource.toString());
-          for (ParticipantId participantId : currentStateMap.keySet()) {
-            // if (!disabledInstances.contains(instance))
-            // {
-            view.setState(partitionId.stringify(), participantId.stringify(),
-                currentStateMap.get(participantId).toString());
-            // }
-          }
-        }
-      }
-
-      // TODO fix this
-      // Update cluster status monitor mbean
-      // ClusterStatusMonitor clusterStatusMonitor =
-      // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
-      // if (idealState != null) {
-      // if (clusterStatusMonitor != null
-      // && !idealState.getStateModelDefRef().equalsIgnoreCase(
-      // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-      // clusterStatusMonitor.onExternalViewChange(view,
-      // cache._idealStateMap.get(view.getResourceName()));
-      // }
-      // }
-
-      // compare the new external view with current one, set only on different
-      ExternalView curExtView = curExtViews.get(resourceId.stringify());
-      if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) {
-        keys.add(keyBuilder.externalView(resourceId.stringify()));
-        newExtViews.add(view);
-
-        // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which
-        // task
-        // partitions are finished (COMPLETED or ERROR), update the status update of the original
-        // scheduler
-        // message, and then remove the partitions from the ideal state
-        RebalancerContext rebalancerContext =
-            (rebalancerConfig != null) ? rebalancerConfig
-                .getRebalancerContext(RebalancerContext.class) : null;
-        if (rebalancerContext != null
-            && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
-                StateModelDefId.SchedulerTaskQueue)) {
-          updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
-        }
-      }
-    }
-    // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all.
-    // Are there any entity that will be interested in its change?
-
-    // add/update external-views
-    if (newExtViews.size() > 0) {
-      dataAccessor.setChildren(keys, newExtViews);
-    }
-
-    // remove dead external-views
-    for (String resourceName : curExtViews.keySet()) {
-      if (!resourceMap.containsKey(ResourceId.from(resourceName))) {
-        dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
-      }
-    }
-
-    long endTime = System.currentTimeMillis();
-    LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
-  }
-
-  // TODO fix it
-  private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
-      HelixManager manager, SchedulerTaskConfig schedulerTaskConfig) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
-
-    // Place holder for finished partitions
-    Map<String, String> emptyMap = new HashMap<String, String>();
-    List<String> emptyList = new LinkedList<String>();
-
-    Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>();
-    Map<String, Map<String, String>> controllerMsgUpdates =
-        new HashMap<String, Map<String, String>>();
-
-    for (String taskPartitionName : ev.getPartitionSet()) {
-      for (String taskState : ev.getStateMap(taskPartitionName).values()) {
-        if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
-            || taskState.equalsIgnoreCase("COMPLETED")) {
-          LOG.info(taskPartitionName + " finished as " + taskState);
-          finishedTasks.setListField(taskPartitionName, emptyList);
-          finishedTasks.setMapField(taskPartitionName, emptyMap);
-
-          // Update original scheduler message status update
-          Message innerMessage =
-              schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
-          if (innerMessage != null) {
-            String controllerMsgId = innerMessage.getControllerMessagId();
-            if (controllerMsgId != null) {
-              LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
-              if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
-                controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
-              }
-              controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState);
-            }
-          }
-        }
-      }
-    }
-    // fill the controllerMsgIdCountMap
-    for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
-      Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
-      String controllerMsgId = innerMessage.getControllerMessagId();
-
-      if (controllerMsgId != null) {
-        Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
-        if (curCnt == null) {
-          curCnt = 0;
-        }
-        controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
-      }
-    }
-
-    if (controllerMsgUpdates.size() > 0) {
-      for (String controllerMsgId : controllerMsgUpdates.keySet()) {
-        PropertyKey controllerStatusUpdateKey =
-            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
-        StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
-        for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
-          Message innerMessage =
-              schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
-
-          Map<String, String> result = new HashMap<String, String>();
-          result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
-          controllerStatusUpdate.getRecord().setMapField(
-              "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
-                  + innerMessage.getMessageId(), result);
-        }
-
-        // All done for the scheduled tasks that came from controllerMsgId, add summary for it
-        if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
-            controllerMsgId).intValue()) {
-          int finishedTasksNum = 0;
-          int completedTasksNum = 0;
-          for (String key : controllerStatusUpdate.getRecord().getMapFields().keySet()) {
-            if (key.startsWith("MessageResult ")) {
-              finishedTasksNum++;
-            }
-            if (controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null) {
-              if (controllerStatusUpdate.getRecord().getMapField(key).get("Result")
-                  .equalsIgnoreCase("COMPLETED")) {
-                completedTasksNum++;
-              }
-            }
-          }
-          Map<String, String> summary = new TreeMap<String, String>();
-          summary.put("TotalMessages:", "" + finishedTasksNum);
-          summary.put("CompletedMessages", "" + completedTasksNum);
-
-          controllerStatusUpdate.getRecord().setMapField("Summary", summary);
-        }
-        // Update the statusUpdate of controllerMsgId
-        accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate);
-      }
-    }
-
-    if (finishedTasks.getListFields().size() > 0) {
-      ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
-      List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
-      deltaList.add(znDelta);
-      IdealState delta = new IdealState(resourceId);
-      delta.setDeltaList(deltaList);
-
-      // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
-      keyBuilder = accessor.keyBuilder();
-      accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
deleted file mode 100644
index 3d51bd0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.SchedulerTaskConfig;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-public class NewMessageGenerationStage extends AbstractBaseStage {
-  private static Logger LOG = Logger.getLogger(NewMessageGenerationStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    HelixManager manager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    ResourceCurrentState currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    NewBestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
-        || bestPossibleStateOutput == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
-    }
-
-    NewMessageOutput output = new NewMessageOutput();
-
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resourceConfig = resourceMap.get(resourceId);
-      int bucketSize = resourceConfig.getBucketSize();
-
-      RebalancerContext rebalancerCtx =
-          resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
-
-      ResourceAssignment resourceAssignment =
-          bestPossibleStateOutput.getResourceAssignment(resourceId);
-      for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
-        Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
-
-        // we should generate message based on the desired-state priority
-        // so keep generated messages in a temp map keyed by state
-        // desired-state->list of generated-messages
-        Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
-
-        for (ParticipantId participantId : instanceStateMap.keySet()) {
-          State desiredState = instanceStateMap.get(participantId);
-
-          State currentState =
-              currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
-          if (currentState == null) {
-            currentState = stateModelDef.getTypedInitialState();
-          }
-
-          if (desiredState.equals(currentState)) {
-            continue;
-          }
-
-          State pendingState =
-              currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
-
-          // TODO fix it
-          State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-          if (nextState == null) {
-            LOG.error("Unable to find a next state for partition: " + subUnitId
-                + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
-                + " to:" + desiredState);
-            continue;
-          }
-
-          if (pendingState != null) {
-            if (nextState.equals(pendingState)) {
-              LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
-                  + " from " + currentState + " to " + nextState);
-            } else if (currentState.equals(pendingState)) {
-              LOG.info("Message hasn't been removed for " + participantId + " to transit"
-                  + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
-            } else {
-              LOG.info("IdealState changed before state transition completes for " + subUnitId
-                  + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
-                  + currentState + ", nextState: " + nextState);
-            }
-          } else {
-            // TODO check if instance is alive
-            SessionId sessionId =
-                cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
-                    .getSessionId();
-            RebalancerContext rebalancerContext =
-                resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-            Message message =
-                createMessage(manager, resourceId, subUnitId, participantId, currentState,
-                    nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
-                    rebalancerContext.getStateModelFactoryId(), bucketSize);
-
-            // TODO refactor get/set timeout/inner-message
-            if (rebalancerContext != null
-                && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
-                    StateModelDefId.SchedulerTaskQueue)) {
-              if (resourceConfig.getSubUnitMap().size() > 0) {
-                // TODO refactor it -- we need a way to read in scheduler tasks a priori
-                Message innerMsg =
-                    resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
-                if (innerMsg != null) {
-                  message.setInnerMessage(innerMsg);
-                }
-              }
-            }
-
-            // Set timeout if needed
-            String stateTransition =
-                String.format("%s-%s_%s", currentState, nextState,
-                    Message.Attributes.TIMEOUT.name());
-            SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
-            if (schedulerTaskConfig != null) {
-              int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
-              if (timeout > 0) {
-                message.setExecutionTimeout(timeout);
-              }
-            }
-            message.setClusterEvent(event);
-
-            if (!messageMap.containsKey(desiredState)) {
-              messageMap.put(desiredState, new ArrayList<Message>());
-            }
-            messageMap.get(desiredState).add(message);
-          }
-        }
-
-        // add generated messages to output according to state priority
-        List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-        for (State state : statesPriorityList) {
-          if (messageMap.containsKey(state)) {
-            for (Message message : messageMap.get(state)) {
-              output.addMessage(resourceId, subUnitId, message);
-            }
-          }
-        }
-
-      } // end of for-each-partition
-    }
-    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
-    // System.out.println("output: " + output);
-  }
-
-  private Message createMessage(HelixManager manager, ResourceId resourceId,
-      PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
-      SessionId participantSessionId, StateModelDefId stateModelDefId,
-      StateModelFactoryId stateModelFactoryId, int bucketSize) {
-    MessageId uuid = MessageId.from(UUID.randomUUID().toString());
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(participantId.stringify());
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionId(partitionId);
-    message.setResourceId(resourceId);
-    message.setFromState(currentState);
-    message.setToState(nextState);
-    message.setTgtSessionId(participantSessionId);
-    message.setSrcSessionId(SessionId.from(manager.getSessionId()));
-    message.setStateModelDef(stateModelDefId);
-    message.setStateModelFactoryId(stateModelFactoryId);
-    message.setBucketSize(bucketSize);
-
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
deleted file mode 100644
index 89231c2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.Message;
-
-public class NewMessageOutput {
-
-  private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
-
-  public NewMessageOutput() {
-    _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
-
-  }
-
-  public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
-    if (!_messagesMap.containsKey(resourceId)) {
-      _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
-    }
-    if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
-      _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
-
-    }
-    _messagesMap.get(resourceId).get(partitionId).add(message);
-
-  }
-
-  public void setMessages(ResourceId resourceId, PartitionId partitionId,
-      List<Message> selectedMessages) {
-    if (!_messagesMap.containsKey(resourceId)) {
-      _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
-    }
-    _messagesMap.get(resourceId).put(partitionId, selectedMessages);
-
-  }
-
-  public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
-    Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
-    if (map != null) {
-      return map.get(partitionId);
-    }
-    return Collections.emptyList();
-
-  }
-
-  public Map<PartitionId, List<Message>> getMessages(ResourceId resourceId) {
-    return _messagesMap.get(resourceId);
-  }
-
-  @Override
-  public String toString() {
-    return _messagesMap.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
deleted file mode 100644
index 4a46a4c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-public class NewMessageSelectionStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewMessageSelectionStage.class);
-
-  public static class Bounds {
-    private int upper;
-    private int lower;
-
-    public Bounds(int lower, int upper) {
-      this.lower = lower;
-      this.upper = upper;
-    }
-
-    public void increaseUpperBound() {
-      upper++;
-    }
-
-    public void increaseLowerBound() {
-      lower++;
-    }
-
-    public void decreaseUpperBound() {
-      upper--;
-    }
-
-    public void decreaseLowerBound() {
-      lower--;
-    }
-
-    public int getLowerBound() {
-      return lower;
-    }
-
-    public int getUpperBound() {
-      return upper;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%d-%d", lower, upper);
-    }
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    ResourceCurrentState currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
-    if (cluster == null || resourceMap == null || currentStateOutput == null
-        || messageGenOutput == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
-    }
-
-    NewMessageOutput output = new NewMessageOutput();
-
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resource.getRebalancerConfig()
-              .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
-
-      // TODO have a logical model for transition
-      Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
-      Resource configResource = cluster.getResource(resourceId);
-
-      // if configResource == null, the resource has been dropped
-      Map<State, Bounds> stateConstraints =
-          computeStateConstraints(stateModelDef,
-              configResource == null ? null : configResource.getRebalancerConfig(), cluster);
-
-      // TODO fix it
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
-        List<Message> selectedMessages =
-            selectMessages(cluster.getLiveParticipantMap(),
-                currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-                currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
-                stateConstraints, stateTransitionPriorities, stateModelDef.getTypedInitialState());
-        output.setMessages(resourceId, partitionId, selectedMessages);
-      }
-    }
-    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
-  }
-
-  // TODO: This method deserves its own class. The class should not understand helix but
-  // just be
-  // able to solve the problem using the algo. I think the method is following that but if
-  // we don't move it to another class its quite easy to break that contract
-  /**
-   * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
-   * group messages by state transition and sorted by priority 3) from highest priority to
-   * lowest, for each message group with the same transition add message one by one and
-   * make sure state constraint is not violated update state lower/upper-bounds when a new
-   * message is selected
-   * @param currentStates
-   * @param pendingStates
-   * @param messages
-   * @param stateConstraints
-   *          : STATE -> bound (lower:upper)
-   * @param stateTransitionPriorities
-   *          : FROME_STATE-TO_STATE -> priority
-   * @return: selected messages
-   */
-  List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
-      Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
-      List<Message> messages, Map<State, Bounds> stateConstraints,
-      final Map<String, Integer> stateTransitionPriorities, State initialState) {
-    if (messages == null || messages.isEmpty()) {
-      return Collections.emptyList();
-    }
-
-    List<Message> selectedMessages = new ArrayList<Message>();
-    Map<State, Bounds> bounds = new HashMap<State, Bounds>();
-
-    // count currentState, if no currentState, count as in initialState
-    for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
-      State state = initialState;
-      if (currentStates.containsKey(liveParticipantId)) {
-        state = currentStates.get(liveParticipantId);
-      }
-
-      if (!bounds.containsKey(state)) {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      bounds.get(state).increaseLowerBound();
-      bounds.get(state).increaseUpperBound();
-    }
-
-    // count pendingStates
-    for (ParticipantId participantId : pendingStates.keySet()) {
-      State state = pendingStates.get(participantId);
-      if (!bounds.containsKey(state)) {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      // TODO: add lower bound, need to refactor pendingState to include fromState also
-      bounds.get(state).increaseUpperBound();
-    }
-
-    // group messages based on state transition priority
-    Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
-        new TreeMap<Integer, List<Message>>();
-    for (Message message : messages) {
-      State fromState = message.getTypedFromState();
-      State toState = message.getTypedToState();
-      String transition = fromState.toString() + "-" + toState.toString();
-      int priority = Integer.MAX_VALUE;
-
-      if (stateTransitionPriorities.containsKey(transition)) {
-        priority = stateTransitionPriorities.get(transition);
-      }
-
-      if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
-        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
-      }
-      messagesGroupByStateTransitPriority.get(priority).add(message);
-    }
-
-    // select messages
-    for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
-      for (Message message : messageList) {
-        State fromState = message.getTypedFromState();
-        State toState = message.getTypedToState();
-
-        if (!bounds.containsKey(fromState)) {
-          LOG.error("Message's fromState is not in currentState. message: " + message);
-          continue;
-        }
-
-        if (!bounds.containsKey(toState)) {
-          bounds.put(toState, new Bounds(0, 0));
-        }
-
-        // check lower bound of fromState
-        if (stateConstraints.containsKey(fromState)) {
-          int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
-          if (newLowerBound < 0) {
-            LOG.error("Number of currentState in " + fromState
-                + " is less than number of messages transiting from " + fromState);
-            continue;
-          }
-
-          if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) {
-            continue;
-          }
-        }
-
-        // check upper bound of toState
-        if (stateConstraints.containsKey(toState)) {
-          int newUpperBound = bounds.get(toState).getUpperBound() + 1;
-          if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
-            continue;
-          }
-        }
-
-        selectedMessages.add(message);
-        bounds.get(fromState).increaseLowerBound();
-        bounds.get(toState).increaseUpperBound();
-      }
-    }
-
-    return selectedMessages;
-  }
-
-  /**
-   * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
-   * beginning and compute the stateConstraint instance once and re use at other places.
-   * Each IdealState must have a constraint object associated with it
-   * @param stateModelDefinition
-   * @param rebalancerConfig if rebalancerConfig == null, we can't evaluate R thus no constraints
-   * @param cluster
-   * @return
-   */
-  private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
-      RebalancerConfig rebalancerConfig, Cluster cluster) {
-    ReplicatedRebalancerContext context =
-        (rebalancerConfig != null) ? rebalancerConfig
-            .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
-    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
-
-    List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
-    for (State state : statePriorityList) {
-      String numInstancesPerState =
-          cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
-              stateModelDefinition.getStateModelDefId(), state);
-      int max = -1;
-      if ("N".equals(numInstancesPerState)) {
-        max = cluster.getLiveParticipantMap().size();
-      } else if ("R".equals(numInstancesPerState)) {
-        // idealState is null when resource has been dropped,
-        // R can't be evaluated and ignore state constraints
-        if (context != null) {
-          if (context.anyLiveParticipant()) {
-            max = cluster.getLiveParticipantMap().size();
-          } else {
-            max = context.getReplicaCount();
-          }
-        }
-      } else {
-        try {
-          max = Integer.parseInt(numInstancesPerState);
-        } catch (Exception e) {
-          // use -1
-        }
-      }
-
-      if (max > -1) {
-        // if state has no constraint, will not put in map
-        stateConstraints.put(state, new Bounds(0, max));
-      }
-    }
-
-    return stateConstraints;
-  }
-
-  // TODO: if state transition priority is not provided then use lexicographical sorting
-  // so that behavior is consistent
-  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
-    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
-    List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityStringList();
-    for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
-      stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
-    }
-
-    return stateTransitionPriorities;
-  }
-}


Mime
View raw message