http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 323be34..a0ee1c1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -30,87 +30,136 @@ import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDefinedState;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.
+ * Collection of functions that will compute the best possible state based on the participants and
+ * the rebalancer configuration of a resource.
*/
-@Deprecated
public class ConstraintBasedAssignment {
private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
- public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
- IdealState idealState, StateModelDefinition stateModelDef) {
- List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+ /**
+ * Get a set of disabled participants for a partition
+ * @param participantMap map of all participants
+ * @param partitionId the partition to check
+ * @return a set of all participants that are disabled for the partition
+ */
+ public static Set<ParticipantId> getDisabledParticipants(
+ final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+ Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
+ Set<ParticipantId> disabledParticipantsForPartition =
+ Sets.filter(participantSet, new Predicate<ParticipantId>() {
+ @Override
+ public boolean apply(ParticipantId participantId) {
+ Participant participant = participantMap.get(participantId);
+ return !participant.isEnabled()
+ || participant.getDisabledPartitionIds().contains(partitionId);
+ }
+ });
+ return disabledParticipantsForPartition;
+ }
- if (listField != null && listField.size() == 1
- && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+ /**
+ * Get an ordered list of participants that can serve a partition
+ * @param cluster cluster snapshot
+ * @param partitionId the partition to look up
+ * @param config rebalancing constraints
+ * @return list with most preferred participants first
+ */
+ public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
+ List<ParticipantId> prefList) {
+ if (prefList != null && prefList.size() == 1
+ && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
+ prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
Collections.sort(prefList);
- return prefList;
- } else {
- return listField;
}
+ return prefList;
}
/**
- * compute best state for resource in AUTO ideal state mode
- * @param cache
+ * Get a map of state to upper bound constraint given a cluster
+ * @param stateModelDef the state model definition to check
+ * @param resourceId the resource that is constraint
+ * @param cluster the cluster the resource belongs to
+ * @return map of state to upper bound
+ */
+ public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
+ ResourceId resourceId, ClusterConfig cluster) {
+ Map<State, String> stateMap = Maps.newHashMap();
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
+ String num =
+ cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+ stateModelDef.getStateModelDefId(), state);
+ stateMap.put(state, num);
+ }
+ return stateMap;
+ }
+
+ /**
+ * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
+ * @param upperBounds map of state to upper bound
+ * @param liveParticipantSet set of live participant ids
* @param stateModelDef
- * @param instancePreferenceList
+ * @param participantPreferenceList
* @param currentStateMap
- * : instance->state for each partition
- * @param disabledInstancesForPartition
+ * : participant->state for each partition
+ * @param disabledParticipantsForPartition
* @return
*/
- public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef, List<String> instancePreferenceList,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
+ public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+ Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
+ StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+ Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+ Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
- // if the ideal state is deleted, instancePreferenceList will be empty and
+ // if the resource is deleted, instancePreferenceList will be empty and
// we should drop all resources.
if (currentStateMap != null) {
- for (String instance : currentStateMap.keySet()) {
- if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !disabledInstancesForPartition.contains(instance)) {
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ if ((participantPreferenceList == null || !participantPreferenceList
+ .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
// if dropped and not disabled, transit to DROPPED
- instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
+ participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+ } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+ participantId).equals(State.from(HelixDefinedState.ERROR)))
+ && disabledParticipantsForPartition.contains(participantId)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
+ participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
}
}
- // ideal state is deleted
- if (instancePreferenceList == null) {
- return instanceStateMap;
+ // resource is deleted
+ if (participantPreferenceList == null) {
+ return participantStateMap;
}
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
- boolean assigned[] = new boolean[instancePreferenceList.size()];
-
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+ boolean assigned[] = new boolean[participantPreferenceList.size()];
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
+ for (State state : statesPriorityList) {
+ String num = upperBounds.get(state);
int stateCount = -1;
if ("N".equals(num)) {
- Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
- liveAndEnabled.removeAll(disabledInstancesForPartition);
+ Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
+ liveAndEnabled.removeAll(disabledParticipantsForPartition);
stateCount = liveAndEnabled.size();
} else if ("R".equals(num)) {
- stateCount = instancePreferenceList.size();
+ stateCount = participantPreferenceList.size();
} else {
try {
stateCount = Integer.parseInt(num);
@@ -120,16 +169,18 @@ public class ConstraintBasedAssignment {
}
if (stateCount > -1) {
int count = 0;
- for (int i = 0; i < instancePreferenceList.size(); i++) {
- String instanceName = instancePreferenceList.get(i);
+ for (int i = 0; i < participantPreferenceList.size(); i++) {
+ ParticipantId participantId = participantPreferenceList.get(i);
boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(instanceName) == null
- || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
-
- if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
- && !disabledInstancesForPartition.contains(instanceName)) {
- instanceStateMap.put(instanceName, state);
+ currentStateMap == null
+ || currentStateMap.get(participantId) == null
+ || !currentStateMap.get(participantId)
+ .equals(State.from(HelixDefinedState.ERROR));
+
+ if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
+ && !disabledParticipantsForPartition.contains(participantId)) {
+ participantStateMap.put(participantId, state);
count = count + 1;
assigned[i] = true;
if (count == stateCount) {
@@ -139,24 +190,25 @@ public class ConstraintBasedAssignment {
}
}
}
- return instanceStateMap;
+ return participantStateMap;
}
/**
* Get the number of replicas that should be in each state for a partition
+ * @param upperBounds map of state to upper bound
* @param stateModelDef StateModelDefinition object
* @param liveNodesNb number of live nodes
* @param total number of replicas
* @return state count map: state->count
*/
- public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
- int liveNodesNb, int totalReplicas) {
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+ public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
+ StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
+ LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
int replicas = totalReplicas;
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
+ for (State state : statesPriorityList) {
+ String num = upperBounds.get(state);
if ("N".equals(num)) {
stateCountMap.put(state, liveNodesNb);
} else if ("R".equals(num)) {
@@ -179,8 +231,8 @@ public class ConstraintBasedAssignment {
}
// get state count for R
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
+ for (State state : statesPriorityList) {
+ String num = upperBounds.get(state);
if ("R".equals(num)) {
stateCountMap.put(state, replicas);
// should have at most one state using R
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
deleted file mode 100644
index f703073..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.apache.helix.controller.rebalancer.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Collection of functions that will compute the best possible state based on the participants and
- * the rebalancer configuration of a resource.
- */
-public class NewConstraintBasedAssignment {
- private static Logger logger = Logger.getLogger(NewConstraintBasedAssignment.class);
-
- /**
- * Get a set of disabled participants for a partition
- * @param participantMap map of all participants
- * @param partitionId the partition to check
- * @return a set of all participants that are disabled for the partition
- */
- public static Set<ParticipantId> getDisabledParticipants(
- final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
- Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
- Set<ParticipantId> disabledParticipantsForPartition =
- Sets.filter(participantSet, new Predicate<ParticipantId>() {
- @Override
- public boolean apply(ParticipantId participantId) {
- Participant participant = participantMap.get(participantId);
- return !participant.isEnabled()
- || participant.getDisabledPartitionIds().contains(partitionId);
- }
- });
- return disabledParticipantsForPartition;
- }
-
- /**
- * Get an ordered list of participants that can serve a partition
- * @param cluster cluster snapshot
- * @param partitionId the partition to look up
- * @param config rebalancing constraints
- * @return list with most preferred participants first
- */
- public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
- List<ParticipantId> prefList) {
- if (prefList != null && prefList.size() == 1
- && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
- prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
- Collections.sort(prefList);
- }
- return prefList;
- }
-
- /**
- * Get a map of state to upper bound constraint given a cluster
- * @param stateModelDef the state model definition to check
- * @param resourceId the resource that is constraint
- * @param cluster the cluster the resource belongs to
- * @return map of state to upper bound
- */
- public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
- ResourceId resourceId, ClusterConfig cluster) {
- Map<State, String> stateMap = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- String num =
- cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
- stateModelDef.getStateModelDefId(), state);
- stateMap.put(state, num);
- }
- return stateMap;
- }
-
- /**
- * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
- * @param upperBounds map of state to upper bound
- * @param liveParticipantSet set of live participant ids
- * @param stateModelDef
- * @param participantPreferenceList
- * @param currentStateMap
- * : participant->state for each partition
- * @param disabledParticipantsForPartition
- * @return
- */
- public static Map<ParticipantId, State> computeAutoBestStateForPartition(
- Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
- StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
- Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
- Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
- // if the resource is deleted, instancePreferenceList will be empty and
- // we should drop all resources.
- if (currentStateMap != null) {
- for (ParticipantId participantId : currentStateMap.keySet()) {
- if ((participantPreferenceList == null || !participantPreferenceList
- .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
- // if dropped and not disabled, transit to DROPPED
- participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
- } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
- participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
- // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
- }
- }
- }
-
- // resource is deleted
- if (participantPreferenceList == null) {
- return participantStateMap;
- }
-
- List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
- boolean assigned[] = new boolean[participantPreferenceList.size()];
-
- for (State state : statesPriorityList) {
- String num = upperBounds.get(state);
- int stateCount = -1;
- if ("N".equals(num)) {
- Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
- liveAndEnabled.removeAll(disabledParticipantsForPartition);
- stateCount = liveAndEnabled.size();
- } else if ("R".equals(num)) {
- stateCount = participantPreferenceList.size();
- } else {
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- logger.error("Invalid count for state:" + state + " ,count=" + num);
- }
- }
- if (stateCount > -1) {
- int count = 0;
- for (int i = 0; i < participantPreferenceList.size(); i++) {
- ParticipantId participantId = participantPreferenceList.get(i);
-
- boolean notInErrorState =
- currentStateMap == null
- || currentStateMap.get(participantId) == null
- || !currentStateMap.get(participantId)
- .equals(State.from(HelixDefinedState.ERROR));
-
- if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
- participantStateMap.put(participantId, state);
- count = count + 1;
- assigned[i] = true;
- if (count == stateCount) {
- break;
- }
- }
- }
- }
- }
- return participantStateMap;
- }
-
- /**
- * Get the number of replicas that should be in each state for a partition
- * @param upperBounds map of state to upper bound
- * @param stateModelDef StateModelDefinition object
- * @param liveNodesNb number of live nodes
- * @param total number of replicas
- * @return state count map: state->count
- */
- public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
- StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
- LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
- List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-
- int replicas = totalReplicas;
- for (State state : statesPriorityList) {
- String num = upperBounds.get(state);
- if ("N".equals(num)) {
- stateCountMap.put(state, liveNodesNb);
- } else if ("R".equals(num)) {
- // wait until we get the counts for all other states
- continue;
- } else {
- int stateCount = -1;
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- // LOG.error("Invalid count for state: " + state + ", count: " + num +
- // ", use -1 instead");
- }
-
- if (stateCount > 0) {
- stateCountMap.put(state, stateCount);
- replicas -= stateCount;
- }
- }
- }
-
- // get state count for R
- for (State state : statesPriorityList) {
- String num = upperBounds.get(state);
- if ("R".equals(num)) {
- stateCountMap.put(state, replicas);
- // should have at most one state using R
- break;
- }
- }
- return stateCountMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 9699dcb..051a2f3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,114 +20,123 @@ package org.apache.helix.controller.stages;
*/
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
/**
* For partition compute best possible (instance,state) pair based on
* IdealState,StateModel,LiveInstance
*/
-@Deprecated
public class BestPossibleStateCalcStage extends AbstractBaseStage {
- private static final Logger logger = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+ private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- logger.info("START BestPossibleStateCalcStage.process()");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("START BestPossibleStateCalcStage.process()");
+ }
- CurrentStateOutput currentStateOutput =
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
- if (currentStateOutput == null || resourceMap == null || cache == null) {
+ if (currentStateOutput == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires CURRENT_STATE|RESOURCES|DataCache");
}
BestPossibleStateOutput bestPossibleStateOutput =
- compute(event, resourceMap, currentStateOutput);
+ compute(cluster, event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
long endTime = System.currentTimeMillis();
- logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ }
}
- private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput) {
- // for each ideal state
- // read the state model def
- // for each resource
- // get the preference list
- // for each instanceName check if its alive then assign a state
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ /**
+ * Fallback for cases when the resource has been dropped, but current state exists
+ * @param cluster cluster snapshot
+ * @param resourceId the resource for which to generate an assignment
+ * @param currentStateOutput full snapshot of the current state
+ * @param stateModelDef state model the resource follows
+ * @return assignment for the dropped resource
+ */
+ private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+ ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
+ ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+ Set<? extends PartitionId> mappedPartitions =
+ currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+ if (mappedPartitions == null) {
+ return partitionMapping;
+ }
+ for (PartitionId partitionId : mappedPartitions) {
+ Set<ParticipantId> disabledParticipantsForPartition =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partitionId);
+ Map<State, String> upperBounds =
+ ConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
+ cluster.getConfig());
+ partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
+ .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+ stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ disabledParticipantsForPartition));
+ }
+ return partitionMapping;
+ }
+ private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+ Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
BestPossibleStateOutput output = new BestPossibleStateOutput();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
- for (String resourceName : resourceMap.keySet()) {
- logger.debug("Processing resource:" + resourceName);
-
- Resource resource = resourceMap.get(resourceName);
- // Ideal state may be gone. In that case we need to get the state model name
- // from the current state
- IdealState idealState = cache.getIdealState(resourceName);
-
- if (idealState == null) {
- // if ideal state is deleted, use an empty one
- logger.info("resource:" + resourceName + " does not exist anymore");
- idealState = new IdealState(resourceName);
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resourceId);
}
-
- Rebalancer rebalancer = null;
- if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
- && idealState.getRebalancerRef() != null) {
- String rebalancerClassName = idealState.getRebalancerRef().toString();
- logger
- .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
- try {
- rebalancer =
- (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
- } catch (Exception e) {
- logger.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+ ResourceConfig resourceConfig = resourceMap.get(resourceId);
+ RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+ ResourceAssignment resourceAssignment = null;
+ if (rebalancerConfig != null) {
+ HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+ if (rebalancer != null) {
+ HelixManager manager = event.getAttribute("helixmanager");
+ rebalancer.init(manager);
+ resourceAssignment =
+ rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
}
}
- if (rebalancer == null) {
- if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
- rebalancer = new AutoRebalancer();
- } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
- rebalancer = new SemiAutoRebalancer();
- } else {
- rebalancer = new CustomRebalancer();
- }
+ if (resourceAssignment == null) {
+ RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+ resourceAssignment =
+ mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
}
- HelixManager manager = event.getAttribute("helixmanager");
- rebalancer.init(manager);
- ResourceAssignment partitionStateAssignment =
- rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
- for (Partition partition : resource.getPartitions()) {
- Map<ParticipantId, State> newStateMap =
- partitionStateAssignment.getReplicaMap(PartitionId.from(partition.getPartitionName()));
- output.setParticipantStateMap(resourceName, partition, newStateMap);
- }
+ output.setResourceAssignment(resourceId, resourceAssignment);
}
+
return output;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 362bbb6..afcb6f7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -1,82 +1,49 @@
package org.apache.helix.controller.stages;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
-@Deprecated
-public class BestPossibleStateOutput {
- // resource->partition->instance->state
- Map<String, Map<Partition, Map<String, String>>> _dataMap;
+import com.google.common.collect.Maps;
- public BestPossibleStateOutput() {
- _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
- }
+public class BestPossibleStateOutput {
- public void setState(String resourceName, Partition resource,
- Map<String, String> bestInstanceStateMappingForResource) {
- if (!_dataMap.containsKey(resourceName)) {
- _dataMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
- }
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- map.put(resource, bestInstanceStateMappingForResource);
- }
+ Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
- public void setParticipantStateMap(String resourceName, Partition partition,
- Map<ParticipantId, State> bestInstanceStateMappingForResource) {
- Map<String, String> rawStateMap = new HashMap<String, String>();
- for (ParticipantId participantId : bestInstanceStateMappingForResource.keySet()) {
- rawStateMap.put(participantId.stringify(),
- bestInstanceStateMappingForResource.get(participantId).toString());
- }
- setState(resourceName, partition, rawStateMap);
+ public BestPossibleStateOutput() {
+ _resourceAssignmentMap = Maps.newHashMap();
}
- public Map<String, String> getInstanceStateMap(String resourceName, Partition resource) {
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- if (map != null) {
- return map.get(resource);
- }
- return Collections.emptyMap();
+ /**
+ * Set the computed resource assignment for a resource
+ * @param resourceId the resource to set
+ * @param resourceAssignment the computed assignment
+ */
+ public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ _resourceAssignmentMap.put(resourceId, resourceAssignment);
}
- public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- if (map != null) {
- return map;
- }
- return Collections.emptyMap();
+ /**
+ * Get the resource assignment computed for a resource
+ * @param resourceId resource to look up
+ * @return ResourceAssignment computed by the best possible state calculation
+ */
+ public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+ return _resourceAssignmentMap.get(resourceId);
}
- public Map<String, Map<Partition, Map<String, String>>> getStateMap() {
- return _dataMap;
+ /**
+ * Get all of the resources currently assigned
+ * @return set of assigned resource ids
+ */
+ public Set<ResourceId> getAssignedResources() {
+ return _resourceAssignmentMap.keySet();
}
@Override
public String toString() {
- return _dataMap.toString();
+ return _resourceAssignmentMap.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 64e881c..532ecb5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -23,10 +23,12 @@ import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.api.Cluster;
import org.apache.helix.api.HelixVersion;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
import org.apache.log4j.Logger;
/**
@@ -38,16 +40,17 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute("helixmanager");
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (manager == null || cache == null) {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ if (manager == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires HelixManager | DataCache");
}
HelixManagerProperties properties = manager.getProperties();
- Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
- for (LiveInstance liveInstance : liveInstanceMap.values()) {
- HelixVersion version = liveInstance.getTypedHelixVersion();
+ // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+ Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+ for (Participant liveParticipant : liveParticipants.values()) {
+ HelixVersion version = liveParticipant.getRunningInstance().getVersion();
String participantVersion = (version != null) ? version.toString() : null;
if (!properties.isParticipantCompatible(participantVersion)) {
String errorMsg =
@@ -55,7 +58,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
+ manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
+ ", minimumSupportedParticipantVersion: "
+ properties.getProperty("minimum_supported_version.participant")
- + ", participant: " + liveInstance.getInstanceName() + ", participantVersion: "
+ + ", participant: " + liveParticipant.getId() + ", participantVersion: "
+ participantVersion;
LOG.error(errorMsg);
throw new StageException(errorMsg);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 7036512..5730289 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -22,59 +22,68 @@ package org.apache.helix.controller.stages;
import java.util.List;
import java.util.Map;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
/**
* For each LiveInstances select currentState and message whose sessionId matches
* sessionId from LiveInstance Get Partition,State for all the resources computed in
* previous State [ResourceComputationStage]
*/
-@Deprecated
public class CurrentStateComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
- if (cache == null || resourceMap == null) {
+ if (cluster == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires DataCache|RESOURCE");
}
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ ResourceCurrentState currentStateOutput = new ResourceCurrentState();
- for (LiveInstance instance : liveInstances.values()) {
- String instanceName = instance.getInstanceName();
- Map<String, Message> instanceMessages = cache.getMessages(instanceName);
- for (Message message : instanceMessages.values()) {
+ for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+ ParticipantId participantId = liveParticipant.getId();
+
+ // add pending messages
+ Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
+ for (Message message : instanceMsgs.values()) {
if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
continue;
}
- if (!instance.getTypedSessionId().equals(message.getTypedTgtSessionId())) {
+
+ if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
continue;
}
+
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId.stringify());
+ ResourceConfig resource = resourceMap.get(resourceId);
if (resource == null) {
continue;
}
if (!message.getBatchMessageMode()) {
PartitionId partitionId = message.getPartitionId();
- Partition partition = resource.getPartition(partitionId.stringify());
+ Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
- currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
- message.getTypedToState().toString());
+ currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+ message.getTypedToState());
} else {
// log
}
@@ -82,10 +91,10 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
List<PartitionId> partitionNames = message.getPartitionIds();
if (!partitionNames.isEmpty()) {
for (PartitionId partitionId : partitionNames) {
- Partition partition = resource.getPartition(partitionId.stringify());
+ Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
- currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
- message.getTypedToState().toString());
+ currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+ message.getTypedToState());
} else {
// log
}
@@ -93,43 +102,41 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
}
}
}
- }
- for (LiveInstance instance : liveInstances.values()) {
- String instanceName = instance.getInstanceName();
-
- String clientSessionId = instance.getTypedSessionId().stringify();
- Map<String, CurrentState> currentStateMap =
- cache.getCurrentState(instanceName, clientSessionId);
- for (CurrentState currentState : currentStateMap.values()) {
- if (!instance.getTypedSessionId().equals(currentState.getTypedSessionId())) {
+ // add current state
+ SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
+ Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
+ for (CurrentState curState : curStateMap.values()) {
+ if (!sessionId.equals(curState.getTypedSessionId())) {
continue;
}
- String resourceName = currentState.getResourceName();
- String stateModelDefName = currentState.getStateModelDefRef();
- Resource resource = resourceMap.get(resourceName);
+
+ ResourceId resourceId = curState.getResourceId();
+ StateModelDefId stateModelDefId = curState.getStateModelDefId();
+ ResourceConfig resource = resourceMap.get(resourceId);
if (resource == null) {
continue;
}
- if (stateModelDefName != null) {
- currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
+
+ if (stateModelDefId != null) {
+ currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
}
- currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
+ currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
- Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
- for (String partitionName : partitionStateMap.keySet()) {
- Partition partition = resource.getPartition(partitionName);
+ Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
+ for (PartitionId partitionId : partitionStateMap.keySet()) {
+ Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
- currentStateOutput.setCurrentState(resourceName, partition, instanceName,
- currentState.getState(partitionName));
-
+ currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+ curState.getState(partitionId));
} else {
// log
}
}
}
}
+
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 3c3a9d9..55a5e54 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -34,33 +34,39 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordDelta;
import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.SchedulerTaskConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.StatusUpdate;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
-@Deprecated
public class ExternalViewComputeStage extends AbstractBaseStage {
- private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+ private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- log.info("START ExternalViewComputeStage.process()");
+ LOG.info("START ExternalViewComputeStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
- if (manager == null || resourceMap == null || cache == null) {
+ if (manager == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires ClusterManager|RESOURCES|DataCache");
}
@@ -68,58 +74,64 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
- CurrentStateOutput currentStateOutput =
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
List<ExternalView> newExtViews = new ArrayList<ExternalView>();
List<PropertyKey> keys = new ArrayList<PropertyKey>();
+ // TODO use external-view accessor
Map<String, ExternalView> curExtViews =
dataAccessor.getChildValuesMap(keyBuilder.externalViews());
- for (String resourceName : resourceMap.keySet()) {
- ExternalView view = new ExternalView(resourceName);
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ExternalView view = new ExternalView(resourceId.stringify());
// view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
// if resource ideal state has bucket size, set it
// otherwise resource has been dropped, use bucket size from current state instead
- Resource resource = resourceMap.get(resourceName);
+ ResourceConfig resource = resourceMap.get(resourceId);
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
+
if (resource.getBucketSize() > 0) {
view.setBucketSize(resource.getBucketSize());
} else {
- view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
+ view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
}
-
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ Map<ParticipantId, State> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId);
if (currentStateMap != null && currentStateMap.size() > 0) {
// Set<String> disabledInstances
// = cache.getDisabledInstancesForResource(resource.toString());
- for (String instance : currentStateMap.keySet()) {
+ for (ParticipantId participantId : currentStateMap.keySet()) {
// if (!disabledInstances.contains(instance))
// {
- view.setState(partition.getPartitionName(), instance, currentStateMap.get(instance));
+ view.setState(partitionId.stringify(), participantId.stringify(),
+ currentStateMap.get(participantId).toString());
// }
}
}
}
+
+ // TODO fix this
// Update cluster status monitor mbean
- ClusterStatusMonitor clusterStatusMonitor =
- (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- IdealState idealState = cache._idealStateMap.get(view.getResourceName());
- if (idealState != null) {
- if (clusterStatusMonitor != null
- && !idealState.getStateModelDefRef().equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- clusterStatusMonitor.onExternalViewChange(view,
- cache._idealStateMap.get(view.getResourceName()));
- }
- }
+ // ClusterStatusMonitor clusterStatusMonitor =
+ // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+ // if (idealState != null) {
+ // if (clusterStatusMonitor != null
+ // && !idealState.getStateModelDefRef().equalsIgnoreCase(
+ // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ // clusterStatusMonitor.onExternalViewChange(view,
+ // cache._idealStateMap.get(view.getResourceName()));
+ // }
+ // }
// compare the new external view with current one, set only on different
- ExternalView curExtView = curExtViews.get(resourceName);
+ ExternalView curExtView = curExtViews.get(resourceId.stringify());
if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) {
- keys.add(keyBuilder.externalView(resourceName));
+ keys.add(keyBuilder.externalView(resourceId.stringify()));
newExtViews.add(view);
// For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which
@@ -127,10 +139,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// partitions are finished (COMPLETED or ERROR), update the status update of the original
// scheduler
// message, and then remove the partitions from the ideal state
- if (idealState != null
- && idealState.getStateModelDefRef().equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- updateScheduledTaskStatus(view, manager, idealState);
+ RebalancerContext rebalancerContext =
+ (rebalancerConfig != null) ? rebalancerConfig
+ .getRebalancerContext(RebalancerContext.class) : null;
+ if (rebalancerContext != null
+ && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+ StateModelDefId.SchedulerTaskQueue)) {
+ updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
}
}
}
@@ -144,18 +159,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// remove dead external-views
for (String resourceName : curExtViews.keySet()) {
- if (!resourceMap.keySet().contains(resourceName)) {
+ if (!resourceMap.containsKey(ResourceId.from(resourceName))) {
dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
}
}
long endTime = System.currentTimeMillis();
- log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+ LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
}
- private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
- IdealState taskQueueIdealState) {
+ // TODO fix it
+ private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
+ HelixManager manager, SchedulerTaskConfig schedulerTaskConfig) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
// Place holder for finished partitions
@@ -166,23 +184,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
Map<String, Map<String, String>> controllerMsgUpdates =
new HashMap<String, Map<String, String>>();
- Builder keyBuilder = accessor.keyBuilder();
-
for (String taskPartitionName : ev.getPartitionSet()) {
for (String taskState : ev.getStateMap(taskPartitionName).values()) {
if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
|| taskState.equalsIgnoreCase("COMPLETED")) {
- log.info(taskPartitionName + " finished as " + taskState);
- finishedTasks.getListFields().put(taskPartitionName, emptyList);
- finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+ LOG.info(taskPartitionName + " finished as " + taskState);
+ finishedTasks.setListField(taskPartitionName, emptyList);
+ finishedTasks.setMapField(taskPartitionName, emptyMap);
// Update original scheduler message status update
- if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) {
- String controllerMsgId =
- taskQueueIdealState.getRecord().getMapField(taskPartitionName)
- .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ Message innerMessage =
+ schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
+ if (innerMessage != null) {
+ String controllerMsgId = innerMessage.getControllerMessageId();
if (controllerMsgId != null) {
- log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+ LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
}
@@ -193,16 +209,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
}
}
// fill the controllerMsgIdCountMap
- for (String taskId : taskQueueIdealState.getPartitionSet()) {
- String controllerMsgId =
- taskQueueIdealState.getRecord().getMapField(taskId)
- .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
+ Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
+ String controllerMsgId = innerMessage.getControllerMessageId();
+
if (controllerMsgId != null) {
- if (!controllerMsgIdCountMap.containsKey(controllerMsgId)) {
- controllerMsgIdCountMap.put(controllerMsgId, 0);
+ Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
+ if (curCnt == null) {
+ curCnt = 0;
}
- controllerMsgIdCountMap.put(controllerMsgId,
- (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+ controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
}
}
@@ -212,18 +228,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
+ Message innerMessage =
+ schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
+
Map<String, String> result = new HashMap<String, String>();
result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
controllerStatusUpdate.getRecord().setMapField(
- "MessageResult "
- + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
- .get(Message.Attributes.TGT_NAME.toString())
- + " "
- + taskPartitionName
- + " "
- + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
- .get(Message.Attributes.MSG_ID.toString()), result);
+ "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
+ + innerMessage.getMessageId(), result);
}
+
// All done for the scheduled tasks that came from controllerMsgId, add summary for it
if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
controllerMsgId).intValue()) {
@@ -255,12 +269,12 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
deltaList.add(znDelta);
- IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+ IdealState delta = new IdealState(resourceId);
delta.setDeltaList(deltaList);
// Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
keyBuilder = accessor.keyBuilder();
- accessor.updateProperty(keyBuilder.idealState(taskQueueIdealState.getResourceName()), delta);
+ accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
deleted file mode 100644
index 3056cd5..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-@Deprecated
-public class MessageGenerationPhase extends AbstractBaseStage {
- private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- HelixManager manager = event.getAttribute("helixmanager");
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
- || bestPossibleStateOutput == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
- }
-
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- Map<String, String> sessionIdMap = new HashMap<String, String>();
-
- for (LiveInstance liveInstance : liveInstances.values()) {
- sessionIdMap
- .put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId().stringify());
- }
- MessageGenerationOutput output = new MessageGenerationOutput();
-
- for (String resourceName : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceName);
- int bucketSize = resource.getBucketSize();
-
- StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> instanceStateMap =
- bestPossibleStateOutput.getInstanceStateMap(resourceName, partition);
-
- // we should generate message based on the desired-state priority
- // so keep generated messages in a temp map keyed by state
- // desired-state->list of generated-messages
- Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
-
- for (String instanceName : instanceStateMap.keySet()) {
- String desiredState = instanceStateMap.get(instanceName);
-
- String currentState =
- currentStateOutput.getCurrentState(resourceName, partition, instanceName);
- if (currentState == null) {
- currentState = stateModelDef.getInitialState();
- }
-
- if (desiredState.equalsIgnoreCase(currentState)) {
- continue;
- }
-
- String pendingState =
- currentStateOutput.getPendingState(resourceName, partition, instanceName);
-
- String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
- if (nextState == null) {
- logger.error("Unable to find a next state for partition: "
- + partition.getPartitionName() + " from stateModelDefinition"
- + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
- continue;
- }
-
- if (pendingState != null) {
- if (nextState.equalsIgnoreCase(pendingState)) {
- logger.debug("Message already exists for " + instanceName + " to transit "
- + partition.getPartitionName() + " from " + currentState + " to " + nextState);
- } else if (currentState.equalsIgnoreCase(pendingState)) {
- logger.info("Message hasn't been removed for " + instanceName + " to transit"
- + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
- + desiredState);
- } else {
- logger.info("IdealState changed before state transition completes for "
- + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
- + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
- }
- } else {
- Message message =
- createMessage(manager, resourceName, partition.getPartitionName(), instanceName,
- currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
- resource.getStateModelFactoryname(), bucketSize);
- IdealState idealState = cache.getIdealState(resourceName);
- if (idealState != null
- && idealState.getStateModelDefRef().equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
- message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
- idealState.getRecord().getMapField(partition.getPartitionName()));
- }
- }
- // Set timeout of needed
- String stateTransition =
- currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
- if (idealState != null) {
- String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
- if (timeOutStr == null
- && idealState.getStateModelDefRef().equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // scheduled task queue
- if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
- timeOutStr =
- idealState.getRecord().getMapField(partition.getPartitionName())
- .get(Message.Attributes.TIMEOUT.toString());
- }
- }
- if (timeOutStr != null) {
- try {
- int timeout = Integer.parseInt(timeOutStr);
- if (timeout > 0) {
- message.setExecutionTimeout(timeout);
- }
- } catch (Exception e) {
- logger.error("", e);
- }
- }
- }
- message.getRecord().setSimpleField("ClusterEventName", event.getName());
- // output.addMessage(resourceName, partition, message);
- if (!messageMap.containsKey(desiredState)) {
- messageMap.put(desiredState, new ArrayList<Message>());
- }
- messageMap.get(desiredState).add(message);
- }
- }
-
- // add generated messages to output according to state priority
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
- for (String state : statesPriorityList) {
- if (messageMap.containsKey(state)) {
- for (Message message : messageMap.get(state)) {
- output.addMessage(resourceName, partition, message);
- }
- }
- }
-
- } // end of for-each-partition
- }
- event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
- }
-
- private Message createMessage(HelixManager manager, String resourceName, String partitionName,
- String instanceName, String currentState, String nextState, String sessionId,
- String stateModelDefName, String stateModelFactoryName, int bucketSize) {
- MessageId uuid = MessageId.from(UUID.randomUUID().toString());
- Message message = new Message(MessageType.STATE_TRANSITION, uuid);
- message.setSrcName(manager.getInstanceName());
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setPartitionId(PartitionId.from(partitionName));
- message.setResourceId(ResourceId.from(resourceName));
- message.setFromState(State.from(currentState));
- message.setToState(State.from(nextState));
- message.setTgtSessionId(SessionId.from(sessionId));
- message.setSrcSessionId(SessionId.from(manager.getSessionId()));
- message.setStateModelDef(StateModelDefId.from(stateModelDefName));
- message.setStateModelFactoryName(stateModelFactoryName);
- message.setBucketSize(bucketSize);
-
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
new file mode 100644
index 0000000..d6fe8c3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -0,0 +1,213 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.SchedulerTaskConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class MessageGenerationStage extends AbstractBaseStage {
+ private static Logger LOG = Logger.getLogger(MessageGenerationStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager manager = event.getAttribute("helixmanager");
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ ResourceCurrentState currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+ || bestPossibleStateOutput == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+ }
+
+ MessageOutput output = new MessageOutput();
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resourceConfig = resourceMap.get(resourceId);
+ int bucketSize = resourceConfig.getBucketSize();
+
+ RebalancerContext rebalancerCtx =
+ resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
+
+ ResourceAssignment resourceAssignment =
+ bestPossibleStateOutput.getResourceAssignment(resourceId);
+ for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+ Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
+
+ // we should generate message based on the desired-state priority
+ // so keep generated messages in a temp map keyed by state
+ // desired-state->list of generated-messages
+ Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+ for (ParticipantId participantId : instanceStateMap.keySet()) {
+ State desiredState = instanceStateMap.get(participantId);
+
+ State currentState =
+ currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
+ if (currentState == null) {
+ currentState = stateModelDef.getTypedInitialState();
+ }
+
+ if (desiredState.equals(currentState)) {
+ continue;
+ }
+
+ State pendingState =
+ currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
+
+ // TODO fix it
+ State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+ if (nextState == null) {
+ LOG.error("Unable to find a next state for partition: " + subUnitId
+ + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
+ + " to:" + desiredState);
+ continue;
+ }
+
+ if (pendingState != null) {
+ if (nextState.equals(pendingState)) {
+ LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
+ + " from " + currentState + " to " + nextState);
+ } else if (currentState.equals(pendingState)) {
+ LOG.info("Message hasn't been removed for " + participantId + " to transit"
+ + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
+ } else {
+ LOG.info("IdealState changed before state transition completes for " + subUnitId
+ + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
+ + currentState + ", nextState: " + nextState);
+ }
+ } else {
+ // TODO check if instance is alive
+ SessionId sessionId =
+ cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+ .getSessionId();
+ RebalancerContext rebalancerContext =
+ resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ Message message =
+ createMessage(manager, resourceId, subUnitId, participantId, currentState,
+ nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
+ rebalancerContext.getStateModelFactoryId(), bucketSize);
+
+ // TODO refactor get/set timeout/inner-message
+ if (rebalancerContext != null
+ && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+ StateModelDefId.SchedulerTaskQueue)) {
+ if (resourceConfig.getSubUnitMap().size() > 0) {
+ // TODO refactor it -- we need a way to read in scheduler tasks a priori
+ Message innerMsg =
+ resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
+ if (innerMsg != null) {
+ message.setInnerMessage(innerMsg);
+ }
+ }
+ }
+
+ // Set timeout if needed
+ String stateTransition =
+ String.format("%s-%s_%s", currentState, nextState,
+ Message.Attributes.TIMEOUT.name());
+ SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
+ if (schedulerTaskConfig != null) {
+ int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
+ if (timeout > 0) {
+ message.setExecutionTimeout(timeout);
+ }
+ }
+ message.setClusterEvent(event);
+
+ if (!messageMap.containsKey(desiredState)) {
+ messageMap.put(desiredState, new ArrayList<Message>());
+ }
+ messageMap.get(desiredState).add(message);
+ }
+ }
+
+ // add generated messages to output according to state priority
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+ for (State state : statesPriorityList) {
+ if (messageMap.containsKey(state)) {
+ for (Message message : messageMap.get(state)) {
+ output.addMessage(resourceId, subUnitId, message);
+ }
+ }
+ }
+
+ } // end of for-each-partition
+ }
+ event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+ // System.out.println("output: " + output);
+ }
+
+ private Message createMessage(HelixManager manager, ResourceId resourceId,
+ PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+ SessionId participantSessionId, StateModelDefId stateModelDefId,
+ StateModelFactoryId stateModelFactoryId, int bucketSize) {
+ MessageId uuid = MessageId.from(UUID.randomUUID().toString());
+ Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+ message.setSrcName(manager.getInstanceName());
+ message.setTgtName(participantId.stringify());
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionId(partitionId);
+ message.setResourceId(resourceId);
+ message.setFromState(currentState);
+ message.setToState(nextState);
+ message.setTgtSessionId(participantSessionId);
+ message.setSrcSessionId(SessionId.from(manager.getSessionId()));
+ message.setStateModelDef(stateModelDefId);
+ message.setStateModelFactoryId(stateModelFactoryId);
+ message.setBucketSize(bucketSize);
+
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
new file mode 100644
index 0000000..9c8c154
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.Message;
+
+public class MessageOutput {
+
+ private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
+
+ public MessageOutput() {
+ _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
+
+ }
+
+ public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
+ if (!_messagesMap.containsKey(resourceId)) {
+ _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+ }
+ if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
+ _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
+
+ }
+ _messagesMap.get(resourceId).get(partitionId).add(message);
+
+ }
+
+ public void setMessages(ResourceId resourceId, PartitionId partitionId,
+ List<Message> selectedMessages) {
+ if (!_messagesMap.containsKey(resourceId)) {
+ _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+ }
+ _messagesMap.get(resourceId).put(partitionId, selectedMessages);
+
+ }
+
+ public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
+ Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
+ if (map != null) {
+ return map.get(partitionId);
+ }
+ return Collections.emptyList();
+
+ }
+
+ public Map<PartitionId, List<Message>> getMessages(ResourceId resourceId) {
+ return _messagesMap.get(resourceId);
+ }
+
+ @Override
+ public String toString() {
+ return _messagesMap.toString();
+ }
+}
|