helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [51/53] [abbrv] Merge branch 'helix-logical-model'
Date Thu, 07 Nov 2013 01:19:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index b72170f,0000000..f18e8cc
mode 100644,000000..100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@@ -1,151 -1,0 +1,162 @@@
 +package org.apache.helix.tools;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.api.id.ResourceId;
 +import org.apache.helix.controller.pipeline.Stage;
 +import org.apache.helix.controller.pipeline.StageContext;
 +import org.apache.helix.controller.stages.AttributeName;
 +import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 +import org.apache.helix.controller.stages.BestPossibleStateOutput;
 +import org.apache.helix.controller.stages.ClusterDataCache;
 +import org.apache.helix.controller.stages.ClusterEvent;
 +import org.apache.helix.controller.stages.CurrentStateComputationStage;
 +import org.apache.helix.controller.stages.ResourceComputationStage;
 +import org.apache.helix.manager.zk.ZkClient;
 +import org.apache.helix.model.ExternalView;
 +import org.apache.helix.model.Partition;
++import org.apache.helix.model.ResourceAssignment;
 +import org.apache.log4j.Logger;
 +
++import com.google.common.collect.Maps;
++
 +/**
 + * given zk, cluster, and a list of expected live-instances
 + * check whether cluster's external-view reaches best-possible states
 + */
 +public class ClusterExternalViewVerifier extends ClusterVerifier {
 +  private static Logger LOG = Logger.getLogger(ClusterExternalViewVerifier.class);
 +
 +  final List<String> _expectSortedLiveNodes; // always sorted
 +
 +  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
 +      List<String> expectLiveNodes) {
 +    super(zkclient, clusterName);
 +    _expectSortedLiveNodes = expectLiveNodes;
 +    Collections.sort(_expectSortedLiveNodes);
 +  }
 +
 +  boolean verifyLiveNodes(List<String> actualLiveNodes) {
 +    Collections.sort(actualLiveNodes);
 +    return _expectSortedLiveNodes.equals(actualLiveNodes);
 +  }
 +
 +  /**
 +   * @param externalView
 +   * @param bestPossibleState map of partition to map of instance to state
 +   * @return
 +   */
 +  boolean verifyExternalView(ExternalView externalView,
 +      Map<Partition, Map<String, String>> bestPossibleState) {
 +    Map<String, Map<String, String>> bestPossibleStateMap =
 +        convertBestPossibleState(bestPossibleState);
 +    // trimBestPossibleState(bestPossibleStateMap);
 +
 +    Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
 +    return externalViewMap.equals(bestPossibleStateMap);
 +  }
 +
 +  static void runStage(ClusterEvent event, Stage stage) throws Exception {
 +    StageContext context = new StageContext();
 +    stage.init(context);
 +    stage.preProcess();
 +    stage.process(event);
 +    stage.postProcess();
 +  }
 +
 +  BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache) throws Exception {
 +    ClusterEvent event = new ClusterEvent("event");
 +    event.addAttribute("ClusterDataCache", cache);
 +
 +    List<Stage> stages = new ArrayList<Stage>();
 +    stages.add(new ResourceComputationStage());
 +    stages.add(new CurrentStateComputationStage());
 +    stages.add(new BestPossibleStateCalcStage());
 +
 +    for (Stage stage : stages) {
 +      runStage(event, stage);
 +    }
 +
 +    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 +  }
 +
 +  /**
 +   * remove empty map and DROPPED state from best possible state
 +   * @param bestPossibleState
 +   */
 +  // static void trimBestPossibleState(Map<String, Map<String, String>> bestPossibleState) {
 +  // Iterator<Entry<String, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
 +  // while (iter.hasNext()) {
 +  // Map.Entry<String, Map<String, String>> entry = iter.next();
 +  // Map<String, String> instanceStateMap = entry.getValue();
 +  // if (instanceStateMap.isEmpty()) {
 +  // iter.remove();
 +  // } else {
 +  // // remove instances with DROPPED state
 +  // Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
 +  // while (insIter.hasNext()) {
 +  // Map.Entry<String, String> insEntry = insIter.next();
 +  // String state = insEntry.getValue();
 +  // if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
 +  // insIter.remove();
 +  // }
 +  // }
 +  // }
 +  // }
 +  // }
 +
 +  static Map<String, Map<String, String>> convertBestPossibleState(
 +      Map<Partition, Map<String, String>> bestPossibleState) {
 +    Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
 +    for (Partition partition : bestPossibleState.keySet()) {
 +      result.put(partition.getPartitionName(), bestPossibleState.get(partition));
 +    }
 +    return result;
 +  }
 +
 +  @Override
 +  public boolean verify() throws Exception {
 +    ClusterDataCache cache = new ClusterDataCache();
 +    cache.refresh(_accessor);
 +
 +    List<String> liveInstances = new ArrayList<String>();
 +    liveInstances.addAll(cache.getLiveInstances().keySet());
 +    boolean success = verifyLiveNodes(liveInstances);
 +    if (!success) {
 +      LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
 +          + liveInstances);
 +      return false;
 +    }
 +
 +    BestPossibleStateOutput bestPossbileStates = calculateBestPossibleState(cache);
 +    Map<String, ExternalView> externalViews =
 +        _accessor.getChildValuesMap(_keyBuilder.externalViews());
 +
 +    // TODO all ideal-states should be included in external-views
 +
 +    for (String resourceName : externalViews.keySet()) {
 +      ExternalView externalView = externalViews.get(resourceName);
-       Map<Partition, Map<String, String>> bestPossbileState =
-           bestPossbileStates.getResourceMap(resourceName);
-       success = verifyExternalView(externalView, bestPossbileState);
++      ResourceAssignment assignment =
++          bestPossbileStates.getResourceAssignment(ResourceId.from(resourceName));
++      final Map<Partition, Map<String, String>> bestPossibleState = Maps.newHashMap();
++      for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
++        Map<String, String> rawStateMap =
++            ResourceAssignment.stringMapFromReplicaMap(assignment.getReplicaMap(partitionId));
++        bestPossibleState.put(new Partition(partitionId.stringify()), rawStateMap);
++      }
++      success = verifyExternalView(externalView, bestPossibleState);
 +      if (!success) {
 +        LOG.info("external-view for resource: " + resourceName + " not match");
 +        return false;
 +      }
 +    }
 +
 +    return true;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index a39e571,fa60c34..1d02275
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@@ -310,8 -310,8 +310,8 @@@ public class ClusterSetup 
          accessor.getChildValues(accessor.keyBuilder().idealStates());
      for (IdealState idealState : existingIdealStates) {
        swapInstanceInIdealState(idealState, oldInstanceName, newInstanceName);
-       accessor.setProperty(accessor.keyBuilder().idealStates(idealState.getResourceName()),
-           idealState);
+       accessor.setProperty(
 -          accessor.keyBuilder().idealState(idealState.getResourceId().stringify()), idealState);
++          accessor.keyBuilder().idealStates(idealState.getResourceId().stringify()), idealState);
      }
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index 74d2987,1aac7e6..da438ba
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@@ -166,8 -160,8 +160,8 @@@ public class TestZKCallback extends ZkU
      IdealState idealState = new IdealState("db-1234");
      idealState.setNumPartitions(400);
      idealState.setReplicas(Integer.toString(2));
-     idealState.setStateModelDefRef("StateModeldef");
+     idealState.setStateModelDefId(StateModelDefId.from("StateModeldef"));
 -    accessor.setProperty(keyBuilder.idealState("db-1234"), idealState);
 +    accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
      Thread.sleep(100);
      AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
      testListener.Reset();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index abf75be,b626cf2..33968f7
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@@ -173,15 -177,15 +177,15 @@@ public class ZkUnitTestBase 
  
    public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
      ZKHelixDataAccessor accessor =
-         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
      Builder keyBuilder = accessor.keyBuilder();
  
 -    IdealState idealState = accessor.getProperty(keyBuilder.idealState(resource));
 +    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
-     for (String partitionName : idealState.getPartitionSet()) {
+     for (PartitionId partitionId : idealState.getPartitionIdSet()) {
        if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-         AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
+         AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionId).size());
        } else if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-         AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName).size());
+         AssertJUnit.assertEquals(repl, idealState.getParticipantStateMap(partitionId).size());
        }
      }
    }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index 0b97e20,9d1dd04..2c1aedf
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@@ -51,10 -53,10 +53,10 @@@ public class TestCompatibilityCheckStag
          DefaultTwoStateStrategy.calculateIdealState(instances, partitions, replicas, resourceName,
              "MASTER", "SLAVE");
      IdealState idealState = new IdealState(record);
-     idealState.setStateModelDefRef("MasterSlave");
+     idealState.setStateModelDefId(StateModelDefId.from("MasterSlave"));
  
      Builder keyBuilder = accessor.keyBuilder();
 -    accessor.setProperty(keyBuilder.idealState(resourceName), idealState);
 +    accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
  
      // set live instances
      record = new ZNRecord("localhost_0");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index e47032e,f8fa4a2..7c74035
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@@ -34,22 -34,20 +34,26 @@@ import java.util.TreeMap
  import java.util.TreeSet;
  
  import org.apache.helix.HelixDefinedState;
- import org.apache.helix.Mocks.MockAccessor;
- import org.apache.helix.PropertyKey.Builder;
  import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ClusterConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ParticipantId;
++import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
- import org.apache.helix.controller.stages.ClusterDataCache;
  import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
- import org.apache.helix.model.LiveInstance;
+ import org.apache.helix.model.IdealState;
  import org.apache.helix.model.StateModelDefinition;
 +import org.apache.helix.tools.StateModelConfigGenerator;
  import org.apache.log4j.Logger;
 +import org.testng.Assert;
  import org.testng.annotations.Test;
  
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Maps;
  import com.google.common.collect.Sets;
  
  public class TestAutoRebalanceStrategy {
@@@ -575,193 -579,4 +585,207 @@@
        return null;
      }
    }
 +
 +  /**
 +   * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
 +   * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
 +   * current mapping, then it should distribute states as evenly as possible.
 +   */
 +  @Test
 +  public void testOrphansNotPreferred() {
-     final String RESOURCE_NAME = "resource";
-     final String[] PARTITIONS = {
-         "resource_0", "resource_1", "resource_2"
-     };
++    final ResourceId RESOURCE = ResourceId.from("resource");
++    final PartitionId[] PARTITIONS =
++        {
++            PartitionId.from("resource_0"), PartitionId.from("resource_1"),
++            PartitionId.from("resource_2")
++        };
 +    final StateModelDefinition STATE_MODEL =
 +        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
 +    final int REPLICA_COUNT = 2;
-     final String[] NODES = {
-         "n0", "n1", "n2"
++    final ParticipantId[] NODES = {
++        ParticipantId.from("n0"), ParticipantId.from("n1"), ParticipantId.from("n2")
 +    };
 +
++    ReplicaPlacementScheme scheme = new AutoRebalanceStrategy.DefaultPlacementScheme();
 +    // initial state, one node, no mapping
-     List<String> allNodes = Lists.newArrayList(NODES[0]);
-     List<String> liveNodes = Lists.newArrayList(NODES[0]);
-     Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
-     for (String partition : PARTITIONS) {
-       currentMapping.put(partition, new HashMap<String, String>());
++    List<ParticipantId> allNodes = Lists.newArrayList(NODES[0]);
++    List<ParticipantId> liveNodes = Lists.newArrayList(NODES[0]);
++    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
++    for (PartitionId partition : PARTITIONS) {
++      currentMapping.put(partition, new HashMap<ParticipantId, State>());
 +    }
 +
 +    // make sure that when the first node joins, a single replica is assigned fairly
-     List<String> partitions = ImmutableList.copyOf(PARTITIONS);
-     LinkedHashMap<String, Integer> stateCount =
-         ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
++    List<PartitionId> partitions = ImmutableList.copyOf(PARTITIONS);
++    Map<State, String> upperBounds = Maps.newHashMap();
++    for (State state : STATE_MODEL.getTypedStatesPriorityList()) {
++      upperBounds.put(state, STATE_MODEL.getNumParticipantsPerState(state));
++    }
++    LinkedHashMap<State, Integer> stateCount =
++        ConstraintBasedAssignment.stateCount(upperBounds, STATE_MODEL, liveNodes.size(),
++            REPLICA_COUNT);
 +    ZNRecord znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    Map<String, List<String>> preferenceLists = znRecord.getListFields();
-     for (String partition : currentMapping.keySet()) {
++    for (PartitionId partition : currentMapping.keySet()) {
 +      // make sure these are all MASTER
-       List<String> preferenceList = preferenceLists.get(partition);
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
 +    }
 +
 +    // now assign a replica to the first node in the current mapping, and add a second node
 +    allNodes.add(NODES[1]);
 +    liveNodes.add(NODES[1]);
-     stateCount = ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-     for (String partition : PARTITIONS) {
-       currentMapping.get(partition).put(NODES[0], "MASTER");
++    stateCount =
++        ConstraintBasedAssignment.stateCount(upperBounds, STATE_MODEL, liveNodes.size(),
++            REPLICA_COUNT);
++    for (PartitionId partition : PARTITIONS) {
++      currentMapping.get(partition).put(NODES[0], State.from("MASTER"));
 +    }
 +    znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    preferenceLists = znRecord.getListFields();
-     for (String partition : currentMapping.keySet()) {
-       List<String> preferenceList = preferenceLists.get(partition);
++    for (PartitionId partition : currentMapping.keySet()) {
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-       Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
-           + partition);
-       Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
-           + partition);
++      Assert.assertEquals(preferenceList.get(0), NODES[0].toString(),
++          "invalid preference list for " + partition);
++      Assert.assertEquals(preferenceList.get(1), NODES[1].toString(),
++          "invalid preference list for " + partition);
 +    }
 +
 +    // now set the current mapping to reflect this update and make sure that it distributes masters
-     for (String partition : PARTITIONS) {
-       currentMapping.get(partition).put(NODES[1], "SLAVE");
++    for (PartitionId partition : PARTITIONS) {
++      currentMapping.get(partition).put(NODES[1], State.from("SLAVE"));
 +    }
 +    znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    preferenceLists = znRecord.getListFields();
 +    Set<String> firstNodes = Sets.newHashSet();
-     for (String partition : currentMapping.keySet()) {
-       List<String> preferenceList = preferenceLists.get(partition);
++    for (PartitionId partition : currentMapping.keySet()) {
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
 +      firstNodes.add(preferenceList.get(0));
 +    }
 +    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
 +
 +    // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
 +    // new node is never the most preferred
 +    allNodes.add(NODES[2]);
 +    liveNodes.add(NODES[2]);
-     stateCount = ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
++    stateCount =
++        ConstraintBasedAssignment.stateCount(upperBounds, STATE_MODEL, liveNodes.size(),
++            REPLICA_COUNT);
 +
 +    // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
-     currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
-     currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
++    currentMapping.get(PARTITIONS[1]).put(NODES[0], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[1], State.from("MASTER"));
 +    znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    preferenceLists = znRecord.getListFields();
 +    boolean newNodeUsed = false;
-     for (String partition : currentMapping.keySet()) {
-       List<String> preferenceList = preferenceLists.get(partition);
++    for (PartitionId partition : currentMapping.keySet()) {
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-       if (preferenceList.contains(NODES[2])) {
++      if (preferenceList.contains(NODES[2].toString())) {
 +        newNodeUsed = true;
-         Assert.assertEquals(preferenceList.get(1), NODES[2],
++        Assert.assertEquals(preferenceList.get(1), NODES[2].toString(),
 +            "newly added node not at preference list tail for " + partition);
 +      }
 +    }
 +    Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
 +
 +    // now remap this to take the new node into account, should go back to balancing masters, slaves
 +    // evenly across all nodes
-     for (String partition : PARTITIONS) {
++    for (PartitionId partition : PARTITIONS) {
 +      currentMapping.get(partition).clear();
 +    }
-     currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
-     currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
-     currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-     currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
-     currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
-     currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
++    currentMapping.get(PARTITIONS[0]).put(NODES[0], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[0]).put(NODES[1], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[1], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[2], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[2]).put(NODES[0], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[2]).put(NODES[2], State.from("SLAVE"));
 +    znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    preferenceLists = znRecord.getListFields();
 +    firstNodes.clear();
 +    Set<String> secondNodes = Sets.newHashSet();
-     for (String partition : currentMapping.keySet()) {
-       List<String> preferenceList = preferenceLists.get(partition);
++    for (PartitionId partition : currentMapping.keySet()) {
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
 +      firstNodes.add(preferenceList.get(0));
 +      secondNodes.add(preferenceList.get(1));
 +    }
 +    Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
 +    Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
 +
 +    // remove a node now, but use the current mapping with everything balanced just prior
 +    liveNodes.remove(0);
-     stateCount = ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
++    stateCount =
++        ConstraintBasedAssignment.stateCount(upperBounds, STATE_MODEL, liveNodes.size(),
++            REPLICA_COUNT);
 +
 +    // remove all references of n0 from the mapping, keep everything else in a legal state
-     for (String partition : PARTITIONS) {
++    for (PartitionId partition : PARTITIONS) {
 +      currentMapping.get(partition).clear();
 +    }
-     currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
-     currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-     currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
-     currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
++    currentMapping.get(PARTITIONS[0]).put(NODES[1], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[1], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[2], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[2]).put(NODES[2], State.from("MASTER"));
 +    znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    preferenceLists = znRecord.getListFields();
-     for (String partition : currentMapping.keySet()) {
-       List<String> preferenceList = preferenceLists.get(partition);
++    for (PartitionId partition : currentMapping.keySet()) {
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-       Map<String, String> stateMap = currentMapping.get(partition);
-       for (String participant : stateMap.keySet()) {
-         Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
-             + partition);
++      Map<ParticipantId, State> stateMap = currentMapping.get(partition);
++      for (ParticipantId participant : stateMap.keySet()) {
++        Assert.assertTrue(preferenceList.contains(participant.toString()),
++            "minimal movement violated for " + partition);
 +      }
 +      for (String participant : preferenceList) {
-         if (!stateMap.containsKey(participant)) {
++        if (!stateMap.containsKey(ParticipantId.from(participant))) {
 +          Assert.assertNotSame(preferenceList.get(0), participant,
 +              "newly moved replica should not be master for " + partition);
 +        }
 +      }
 +    }
 +
 +    // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
-     for (String partition : PARTITIONS) {
++    for (PartitionId partition : PARTITIONS) {
 +      currentMapping.get(partition).clear();
 +    }
-     currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
-     currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
-     currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
-     currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
-     currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
-     currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
++    currentMapping.get(PARTITIONS[0]).put(NODES[1], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[0]).put(NODES[2], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[1], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[1]).put(NODES[2], State.from("MASTER"));
++    currentMapping.get(PARTITIONS[2]).put(NODES[1], State.from("SLAVE"));
++    currentMapping.get(PARTITIONS[2]).put(NODES[2], State.from("MASTER"));
 +    znRecord =
-         new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-             .computePartitionAssignment(liveNodes, currentMapping, allNodes);
++        new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
++            .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
 +    preferenceLists = znRecord.getListFields();
 +    firstNodes.clear();
-     for (String partition : currentMapping.keySet()) {
-       List<String> preferenceList = preferenceLists.get(partition);
++    for (PartitionId partition : currentMapping.keySet()) {
++      List<String> preferenceList = preferenceLists.get(partition.toString());
 +      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
 +      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
 +      firstNodes.add(preferenceList.get(0));
 +    }
 +    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
 +  }
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index 33938ad,d6c6d01..ba4eee2
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@@ -86,9 -84,9 +84,9 @@@ public class TestAddStateModelFactoryAf
      ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
      Builder keyBuilder = accessor.keyBuilder();
 -    IdealState idealState = accessor.getProperty(keyBuilder.idealState("TestDB1"));
 +    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB1"));
-     idealState.setStateModelFactoryName("TestDB1_Factory");
+     idealState.setStateModelFactoryId(StateModelFactoryId.from("TestDB1_Factory"));
 -    accessor.setProperty(keyBuilder.idealState("TestDB1"), idealState);
 +    accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
      setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
  
      // assert that we have received OFFLINE->SLAVE messages for all partitions

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 1943364,c74d654..0e7f4fa
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@@ -255,21 -259,29 +259,29 @@@ public class TestAutoRebalance extends 
      @Override
      public boolean verify() {
        HelixDataAccessor accessor =
-           new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+           new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
        Builder keyBuilder = accessor.keyBuilder();
-       int numberOfPartitions =
-           accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
-               .size();
-       ClusterDataCache cache = new ClusterDataCache();
-       cache.refresh(accessor);
-       String masterValue =
-           cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef())
-               .getStatesPriorityList().get(0);
-       int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
-       String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
 -      IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
++      IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
+       if (idealState == null) {
+         return false;
+       }
+ 
+       int numberOfPartitions = idealState.getRecord().getListFields().size();
+       String stateModelDefName = idealState.getStateModelDefId().stringify();
+       StateModelDefinition stateModelDef =
+           accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+       State masterValue = stateModelDef.getTypedStatesPriorityList().get(0);
+       int replicas = Integer.parseInt(idealState.getReplicas());
+ 
+       String instanceGroupTag = idealState.getInstanceGroupTag();
+ 
        int instances = 0;
-       for (String liveInstanceName : cache.getLiveInstances().keySet()) {
-         if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+       Map<String, LiveInstance> liveInstanceMap =
+           accessor.getChildValuesMap(keyBuilder.liveInstances());
+       Map<String, InstanceConfig> instanceConfigMap =
+           accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+       for (String liveInstanceName : liveInstanceMap.keySet()) {
+         if (instanceConfigMap.get(liveInstanceName).containsTag(instanceGroupTag)) {
            instances++;
          }
        }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 32cafcf,7228cef..3523461
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@@ -222,20 -226,20 +226,20 @@@ public class TestAutoRebalancePartition
      @Override
      public boolean verify() {
        HelixDataAccessor accessor =
-           new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+           new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
        Builder keyBuilder = accessor.keyBuilder();
-       int numberOfPartitions =
-           accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
-               .size();
-       ClusterDataCache cache = new ClusterDataCache();
-       cache.refresh(accessor);
-       String masterValue =
-           cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef())
-               .getStatesPriorityList().get(0);
-       int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
 -      IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
++      IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
+       int numberOfPartitions = idealState.getRecord().getListFields().size();
+       String stateModelDefName = idealState.getStateModelDefId().stringify();
+       StateModelDefinition stateModelDef =
+           accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+       State masterValue = stateModelDef.getTypedStatesPriorityList().get(0);
+       Map<String, LiveInstance> liveInstanceMap =
+           accessor.getChildValuesMap(keyBuilder.liveInstances());
+       int replicas = Integer.parseInt(idealState.getReplicas());
        return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName))
-           .getRecord(), numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size(),
-           cache.getIdealState(_resourceName).getMaxPartitionsPerInstance());
+           .getRecord(), numberOfPartitions, masterValue.toString(), replicas,
+           liveInstanceMap.size(), idealState.getMaxPartitionsPerInstance());
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index bf2de1e,ede4e12..9af6ca8
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@@ -224,10 -222,13 +222,13 @@@ public class TestBatchMessage extends Z
      ZKHelixDataAccessor accessor =
          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
      Builder keyBuilder = accessor.keyBuilder();
 -    IdealState idealState = accessor.getProperty(keyBuilder.idealState("TestDB0"));
 +    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
      idealState.setBatchMessageMode(true);
 -    accessor.setProperty(keyBuilder.idealState("TestDB0"), idealState);
 +    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
  
+     final String hostToFail = "localhost_12921";
+     final String partitionToFail = "TestDB0_4";
+ 
      TestHelper
          .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
      for (int i = 0; i < n; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 55fc876,6b79b61..2c0badc
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@@ -112,10 -117,10 +117,10 @@@ public class TestCustomizedIdealStateRe
      for (String partition : ev.getPartitionSet()) {
        Assert.assertEquals(ev.getStateMap(partition).size(), 1);
      }
 -    IdealState is = accessor.getProperty(keyBuilder.idealState(db2));
 +    IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
-     for (String partition : is.getPartitionSet()) {
+     for (PartitionId partition : is.getPartitionIdSet()) {
        Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
-       Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0);
+       Assert.assertEquals(is.getParticipantStateMap(partition).size(), 0);
      }
      Assert.assertTrue(testRebalancerCreated);
      Assert.assertTrue(testRebalancerInvoked);
@@@ -136,21 -141,23 +141,23 @@@
      public boolean verify() {
        try {
          HelixDataAccessor accessor =
-             new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+             new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
          Builder keyBuilder = accessor.keyBuilder();
-         int numberOfPartitions =
-             accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
-                 .size();
-         ClusterDataCache cache = new ClusterDataCache();
-         cache.refresh(accessor);
-         String masterValue =
-             cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef())
-                 .getStatesPriorityList().get(0);
-         int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
-         String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
 -        IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName));
++        IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
+         int numberOfPartitions = idealState.getRecord().getListFields().size();
+         String stateModelDefName = idealState.getStateModelDefId().stringify();
+         StateModelDefinition stateModelDef =
+             accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName));
+         State masterValue = stateModelDef.getTypedStatesPriorityList().get(0);
+         int replicas = Integer.parseInt(idealState.getReplicas());
+         String instanceGroupTag = idealState.getInstanceGroupTag();
          int instances = 0;
-         for (String liveInstanceName : cache.getLiveInstances().keySet()) {
-           if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+         Map<String, LiveInstance> liveInstanceMap =
+             accessor.getChildValuesMap(keyBuilder.liveInstances());
+         Map<String, InstanceConfig> instanceCfgMap =
+             accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+         for (String liveInstanceName : liveInstanceMap.keySet()) {
+           if (instanceCfgMap.get(liveInstanceName).containsTag(instanceGroupTag)) {
              instances++;
            }
          }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 0000000,6f148cc..43e0250
mode 000000,100644..100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@@ -1,0 -1,151 +1,153 @@@
+ package org.apache.helix.integration;
+ 
+ import java.util.Arrays;
+ import java.util.Date;
+ import java.util.Map;
+ 
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.HelixController;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixParticipant;
+ import org.apache.helix.NotificationContext;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.TestHelper;
+ import org.apache.helix.ZkUnitTestBase;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.accessor.ClusterAccessor;
+ import org.apache.helix.api.config.ClusterConfig;
+ import org.apache.helix.api.config.ParticipantConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ControllerId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.api.id.StateModelDefId;
+ import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.helix.participant.statemachine.HelixStateModelFactory;
+ import org.apache.helix.participant.statemachine.StateModel;
+ import org.apache.helix.participant.statemachine.StateModelInfo;
+ import org.apache.helix.participant.statemachine.Transition;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ public class TestHelixConnection extends ZkUnitTestBase {
+   private static final Logger LOG = Logger.getLogger(TestHelixConnection.class.getName());
+ 
+   @StateModelInfo(initialState = "OFFLINE", states = {
+       "MASTER", "SLAVE", "OFFLINE", "ERROR"
+   })
+   public static class MockStateModel extends StateModel {
+     public MockStateModel() {
+ 
+     }
+ 
+     @Transition(to = "*", from = "*")
+     public void onBecomeAnyFromAny(Message message, NotificationContext context) {
+       String from = message.getFromState();
+       String to = message.getToState();
+       LOG.info("Become " + to + " from " + from);
+     }
+   }
+ 
+   public static class MockStateModelFactory extends HelixStateModelFactory<MockStateModel> {
+ 
+     public MockStateModelFactory() {
+     }
+ 
+     @Override
+     public MockStateModel createNewStateModel(PartitionId partitionId) {
+       MockStateModel model = new MockStateModel();
+ 
+       return model;
+     }
+   }
+ 
+   @Test
+   public void test() throws Exception {
+     String className = TestHelper.getTestClassName();
+     String methodName = TestHelper.getTestMethodName();
+     String clusterName = className + "_" + methodName;
+ 
+     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ 
+     String zkAddr = ZK_ADDR;
+     ClusterId clusterId = ClusterId.from(clusterName);
+     ControllerId controllerId = ControllerId.from("controller");
+     final ParticipantId participantId = ParticipantId.from("participant1");
+ 
+     ResourceId resourceId = ResourceId.from("testDB");
+     State master = State.from("MASTER");
+     State slave = State.from("SLAVE");
+     State offline = State.from("OFFLINE");
++    State dropped = State.from("DROPPED");
+     StateModelDefId stateModelDefId = StateModelDefId.from("MasterSlave");
+ 
+     // create connection
+     HelixConnection connection = new ZkHelixConnection(zkAddr);
+     connection.connect();
+ 
+     // setup cluster
+     ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+     clusterAccessor.dropCluster();
+ 
+     StateModelDefinition stateModelDef =
+         new StateModelDefinition.Builder(stateModelDefId).addState(master, 1).addState(slave, 2)
 -            .addState(offline, 3).addTransition(offline, slave, 3).addTransition(slave, offline, 4)
 -            .addTransition(slave, master, 2).addTransition(master, slave, 1).initialState(offline)
++            .addState(offline, 3).addState(dropped).addTransition(offline, slave, 3)
++            .addTransition(slave, offline, 4).addTransition(slave, master, 2)
++            .addTransition(master, slave, 1).addTransition(offline, dropped).initialState(offline)
+             .upperBound(master, 1).dynamicUpperBound(slave, "R").build();
+     RebalancerContext rebalancerCtx =
+         new SemiAutoRebalancerContext.Builder(resourceId).addPartitions(1).replicaCount(1)
+             .stateModelDefId(stateModelDefId)
+             .preferenceList(PartitionId.from("testDB_0"), Arrays.asList(participantId)).build();
+     clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+         stateModelDef).build());
+     clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerContext(
+         rebalancerCtx).build());
+     clusterAccessor.addParticipantToCluster(new ParticipantConfig.Builder(participantId).build());
+ 
+     // start controller
+     HelixController controller = connection.createController(clusterId, controllerId);
+     controller.startAsync();
+ 
+     // start participant
+     HelixParticipant participant = connection.createParticipant(clusterId, participantId);
+     participant.getStateMachineEngine().registerStateModelFactory(
+         StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
+ 
+     participant.startAsync();
+ 
+     // verify
+     final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
+     final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+     boolean success = TestHelper.verify(new TestHelper.Verifier() {
+ 
+       @Override
+       public boolean verify() throws Exception {
+         ExternalView externalView = accessor.getProperty(keyBuilder.externalView("testDB"));
+         Map<ParticipantId, State> stateMap = externalView.getStateMap(PartitionId.from("testDB_0"));
+ 
+         if (stateMap == null || !stateMap.containsKey(participantId)) {
+           return false;
+         }
+ 
+         return stateMap.get(participantId).equals(State.from("MASTER"));
+       }
+     }, 10 * 1000);
+ 
+     Assert.assertTrue(success);
+ 
+     // clean up
+     controller.stopAsync();
+     participant.stopAsync();
+ 
+     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
index c3133cc,cba5459..e04cc79
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@@ -57,10 -58,10 +58,10 @@@ public class TestRenamePartition extend
  
      // rename partition name TestDB0_0 tp TestDB0_100
      ZKHelixDataAccessor accessor =
-         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
      Builder keyBuilder = accessor.keyBuilder();
  
 -    IdealState idealState = accessor.getProperty(keyBuilder.idealState("TestDB0"));
 +    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
  
      List<String> prioList = idealState.getRecord().getListFields().remove("TestDB0_0");
      idealState.getRecord().getListFields().put("TestDB0_100", prioList);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
----------------------------------------------------------------------
diff --cc helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index c58f94d,a2c7601..6d2b732
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@@ -650,14 -652,14 +652,14 @@@ public class TestHelixAdminCli extends 
      command = "-zkSvr localhost:2183 -rebalance " + clusterName + " db_11 2 -instanceGroupTag tag1";
      ClusterSetup.processCommandLineArgs(command.split("\\s+"));
  
 -    IdealState dbIs = accessor.getProperty(accessor.keyBuilder().idealState("db_11"));
 +    IdealState dbIs = accessor.getProperty(accessor.keyBuilder().idealStates("db_11"));
-     Set<String> hosts = new HashSet<String>();
-     for (String p : dbIs.getPartitionSet()) {
-       for (String hostName : dbIs.getInstanceStateMap(p).keySet()) {
+     Set<ParticipantId> hosts = new HashSet<ParticipantId>();
+     for (PartitionId p : dbIs.getPartitionIdSet()) {
+       for (ParticipantId participantId : dbIs.getParticipantStateMap(p).keySet()) {
          InstanceConfig config =
-             accessor.getProperty(accessor.keyBuilder().instanceConfig(hostName));
+             accessor.getProperty(accessor.keyBuilder().instanceConfig(participantId.stringify()));
          Assert.assertTrue(config.containsTag("tag1"));
-         hosts.add(hostName);
+         hosts.add(participantId);
        }
      }
      Assert.assertEquals(hosts.size(), 2);
@@@ -672,14 -674,14 +674,14 @@@
      command = "-zkSvr localhost:2183 -rebalance " + clusterName + " db_11 3 -instanceGroupTag tag2";
      ClusterSetup.processCommandLineArgs(command.split("\\s+"));
  
 -    dbIs = accessor.getProperty(accessor.keyBuilder().idealState("db_11"));
 +    dbIs = accessor.getProperty(accessor.keyBuilder().idealStates("db_11"));
-     hosts = new HashSet<String>();
-     for (String p : dbIs.getPartitionSet()) {
-       for (String hostName : dbIs.getInstanceStateMap(p).keySet()) {
+     hosts = new HashSet<ParticipantId>();
+     for (PartitionId p : dbIs.getPartitionIdSet()) {
+       for (ParticipantId participantId : dbIs.getParticipantStateMap(p).keySet()) {
          InstanceConfig config =
-             accessor.getProperty(accessor.keyBuilder().instanceConfig(hostName));
+             accessor.getProperty(accessor.keyBuilder().instanceConfig(participantId.stringify()));
          Assert.assertTrue(config.containsTag("tag2"));
-         hosts.add(hostName);
+         hosts.add(participantId);
        }
      }
      Assert.assertEquals(hosts.size(), 4);
@@@ -700,14 -702,14 +702,14 @@@
  
      command = "-zkSvr localhost:2183 -rebalance " + clusterName + " db_11 3 -instanceGroupTag tag2";
      ClusterSetup.processCommandLineArgs(command.split("\\s+"));
 -    dbIs = accessor.getProperty(accessor.keyBuilder().idealState("db_11"));
 +    dbIs = accessor.getProperty(accessor.keyBuilder().idealStates("db_11"));
-     hosts = new HashSet<String>();
-     for (String p : dbIs.getPartitionSet()) {
-       for (String hostName : dbIs.getInstanceStateMap(p).keySet()) {
+     hosts = new HashSet<ParticipantId>();
+     for (PartitionId p : dbIs.getPartitionIdSet()) {
+       for (ParticipantId participantId : dbIs.getParticipantStateMap(p).keySet()) {
          InstanceConfig config =
-             accessor.getProperty(accessor.keyBuilder().instanceConfig(hostName));
+             accessor.getProperty(accessor.keyBuilder().instanceConfig(participantId.stringify()));
          Assert.assertTrue(config.containsTag("tag2"));
-         hosts.add(hostName);
+         hosts.add(participantId);
        }
      }
      Assert.assertEquals(hosts.size(), 3);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --cc helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
index e023cf9,57c61cd..840a963
--- a/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@@ -38,11 -37,8 +38,10 @@@ import org.apache.helix.model.Message.M
  import org.apache.helix.participant.StateMachineEngine;
  import org.apache.helix.participant.statemachine.StateModel;
  import org.apache.helix.participant.statemachine.StateModelFactory;
- import org.apache.helix.tools.ClusterStateVerifier;
 +import org.apache.log4j.Logger;
  
  public class ExampleProcess {
 +  private static final Logger LOG = Logger.getLogger(ExampleProcess.class);
  
    public static final String zkServer = "zkSvr";
    public static final String cluster = "cluster";

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
----------------------------------------------------------------------
diff --cc recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
index cede270,cede270..ab423f4
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockFactory.java
@@@ -19,11 -19,11 +19,12 @@@ package org.apache.helix.lockmanager
   * under the License.
   */
  
--import org.apache.helix.participant.statemachine.StateModelFactory;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.participant.statemachine.HelixStateModelFactory;
  
--public class LockFactory extends StateModelFactory<Lock> {
++public class LockFactory extends HelixStateModelFactory<Lock> {
    @Override
--  public Lock createNewStateModel(String lockName) {
--    return new Lock(lockName);
++  public Lock createNewStateModel(PartitionId lockName) {
++    return new Lock(lockName.toString());
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockManagerDemo.java
----------------------------------------------------------------------
diff --cc recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockManagerDemo.java
index b6c54db,b6c54db..e105b49
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockManagerDemo.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockManagerDemo.java
@@@ -56,9 -56,9 +56,8 @@@ public class LockManagerDemo 
        startLocalZookeeper(2199);
        HelixAdmin admin = new ZKHelixAdmin(zkAddress);
        admin.addCluster(clusterName, true);
--      StateModelConfigGenerator generator = new StateModelConfigGenerator();
--      admin.addStateModelDef(clusterName, "OnlineOffline",
--          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
++      admin.addStateModelDef(clusterName, "OnlineOffline", new StateModelDefinition(
++          StateModelConfigGenerator.generateConfigForOnlineOffline()));
        admin.addResource(clusterName, lockGroupName, numPartitions, "OnlineOffline",
            RebalanceMode.FULL_AUTO.toString());
        admin.rebalance(clusterName, lockGroupName, 1);
@@@ -148,7 -148,7 +147,7 @@@
        }
      };
      int zkPort = 2199;
--    final String zkAddress = "localhost:" + zkPort;
++    // final String zkAddress = "localhost:" + zkPort;
  
      server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
      server.start();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockProcess.java
----------------------------------------------------------------------
diff --cc recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockProcess.java
index 560f04f,560f04f..89b8b72
--- a/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockProcess.java
+++ b/recipes/distributed-lock-manager/src/main/java/org/apache/helix/lockmanager/LockProcess.java
@@@ -24,6 -24,6 +24,7 @@@ import java.util.List
  import org.apache.helix.HelixManager;
  import org.apache.helix.HelixManagerFactory;
  import org.apache.helix.InstanceType;
++import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.controller.HelixControllerMain;
  import org.apache.helix.manager.zk.ZKHelixAdmin;
  import org.apache.helix.model.InstanceConfig;
@@@ -50,8 -50,8 +51,8 @@@ public class LockProcess 
      participantManager =
          HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
              zkAddress);
--    participantManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
--        new LockFactory());
++    participantManager.getStateMachineEngine().registerStateModelFactory(
++        StateModelDefId.from("OnlineOffline"), new LockFactory());
      participantManager.connect();
      if (startController) {
        startController();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
----------------------------------------------------------------------
diff --cc recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
index 48be0a8,48be0a8..0b164b3
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@@ -24,12 -24,12 +24,12 @@@ import java.util.List
  import org.apache.helix.HelixManager;
  import org.apache.helix.HelixManagerFactory;
  import org.apache.helix.InstanceType;
++import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.manager.zk.ZKHelixAdmin;
  import org.apache.helix.manager.zk.ZNRecordSerializer;
  import org.apache.helix.manager.zk.ZkClient;
  import org.apache.helix.model.InstanceConfig;
  import org.apache.helix.participant.StateMachineEngine;
--import org.apache.helix.participant.statemachine.StateModel;
  
  public class Consumer {
    private final String _zkAddr;
@@@ -54,7 -54,7 +54,8 @@@
        StateMachineEngine stateMach = _manager.getStateMachineEngine();
        ConsumerStateModelFactory modelFactory =
            new ConsumerStateModelFactory(_consumerId, _mqServer);
--      stateMach.registerStateModelFactory(SetupConsumerCluster.DEFAULT_STATE_MODEL, modelFactory);
++      stateMach.registerStateModelFactory(
++          StateModelDefId.from(SetupConsumerCluster.DEFAULT_STATE_MODEL), modelFactory);
  
        _manager.connect();
  

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
----------------------------------------------------------------------
diff --cc recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
index c59e9c4,c59e9c4..98cce35
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModelFactory.java
@@@ -19,9 -19,9 +19,10 @@@ package org.apache.helix.recipes.rabbit
   * under the License.
   */
  
--import org.apache.helix.participant.statemachine.StateModelFactory;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.participant.statemachine.HelixStateModelFactory;
  
--public class ConsumerStateModelFactory extends StateModelFactory<ConsumerStateModel> {
++public class ConsumerStateModelFactory extends HelixStateModelFactory<ConsumerStateModel> {
    private final String _consumerId;
    private final String _mqServer;
  
@@@ -31,8 -31,8 +32,9 @@@
    }
  
    @Override
--  public ConsumerStateModel createNewStateModel(String partition) {
--    ConsumerStateModel model = new ConsumerStateModel(_consumerId, partition, _mqServer);
++  public ConsumerStateModel createNewStateModel(PartitionId partition) {
++    ConsumerStateModel model =
++        new ConsumerStateModel(_consumerId, partition.stringify(), _mqServer);
      return model;
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
----------------------------------------------------------------------
diff --cc recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
index bfc3149,bfc3149..6288694
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/SetupConsumerCluster.java
@@@ -52,9 -52,9 +52,8 @@@ public class SetupConsumerCluster 
        admin.addCluster(clusterName, true);
  
        // add state model definition
--      StateModelConfigGenerator generator = new StateModelConfigGenerator();
--      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL,
--          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
++      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
++          StateModelConfigGenerator.generateConfigForOnlineOffline()));
  
        // add resource "topic" which has 6 partitions
        String resourceName = DEFAULT_RESOURCE_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
----------------------------------------------------------------------
diff --cc recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
index 88ea7a2,6448411..42f477d
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStore.java
@@@ -22,12 -22,7 +22,8 @@@ package org.apache.helix.filestore
  import org.apache.helix.HelixManager;
  import org.apache.helix.HelixManagerFactory;
  import org.apache.helix.InstanceType;
- import org.apache.helix.manager.zk.ZKHelixAdmin;
- import org.apache.helix.manager.zk.ZNRecordSerializer;
++import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.manager.zk.ZkClient;
- import org.apache.helix.model.InstanceConfig;
  import org.apache.helix.participant.StateMachineEngine;
  
  public class FileStore {
@@@ -50,7 -45,7 +46,8 @@@
  
        StateMachineEngine stateMach = _manager.getStateMachineEngine();
        FileStoreStateModelFactory modelFactory = new FileStoreStateModelFactory(_manager);
--      stateMach.registerStateModelFactory(SetupCluster.DEFAULT_STATE_MODEL, modelFactory);
++      stateMach.registerStateModelFactory(StateModelDefId.from(SetupCluster.DEFAULT_STATE_MODEL),
++          modelFactory);
        _manager.connect();
        // _manager.addExternalViewChangeListener(replicator);
        Thread.currentThread().join();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
----------------------------------------------------------------------
diff --cc recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
index 4df8e3d,4df8e3d..7e1938c
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/FileStoreStateModelFactory.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.helix.filestore
   */
  
  import org.apache.helix.HelixManager;
--import org.apache.helix.participant.statemachine.StateModelFactory;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.participant.statemachine.HelixStateModelFactory;
  
--public class FileStoreStateModelFactory extends StateModelFactory<FileStoreStateModel> {
++public class FileStoreStateModelFactory extends HelixStateModelFactory<FileStoreStateModel> {
    private final HelixManager manager;
  
    public FileStoreStateModelFactory(HelixManager manager) {
@@@ -30,9 -30,9 +31,10 @@@
    }
  
    @Override
--  public FileStoreStateModel createNewStateModel(String partition) {
++  public FileStoreStateModel createNewStateModel(PartitionId partition) {
      FileStoreStateModel model;
--    model = new FileStoreStateModel(manager, partition.split("_")[0], partition);
++    model =
++        new FileStoreStateModel(manager, partition.toString().split("_")[0], partition.toString());
      return model;
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
----------------------------------------------------------------------
diff --cc recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
index 5c756ce,5c756ce..d6482a2
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
@@@ -55,9 -55,9 +55,8 @@@ public class SetupCluster 
        admin.addCluster(clusterName, true);
  
        // add state model definition
--      StateModelConfigGenerator generator = new StateModelConfigGenerator();
--      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL,
--          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
++      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
++          StateModelConfigGenerator.generateConfigForOnlineOffline()));
        // addNodes
        for (int i = 0; i < numNodes; i++) {
          String port = "" + (12001 + i);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --cc recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
index 0864ced,0864ced..6948237
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/TaskStateModelFactory.java
@@@ -19,9 -19,9 +19,10 @@@ package org.apache.helix.taskexecution
   * under the License.
   */
  
--import org.apache.helix.participant.statemachine.StateModelFactory;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.participant.statemachine.HelixStateModelFactory;
  
--public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
++public class TaskStateModelFactory extends HelixStateModelFactory<TaskStateModel> {
    private final String _workerId;
    private final TaskFactory _taskFactory;
    private TaskResultStore _taskResultStore;
@@@ -34,8 -34,8 +35,9 @@@
    }
  
    @Override
--  public TaskStateModel createNewStateModel(String partition) {
--    TaskStateModel model = new TaskStateModel(_workerId, partition, _taskFactory, _taskResultStore);
++  public TaskStateModel createNewStateModel(PartitionId partition) {
++    TaskStateModel model =
++        new TaskStateModel(_workerId, partition.toString(), _taskFactory, _taskResultStore);
      return model;
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
----------------------------------------------------------------------
diff --cc recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
index 4d54b23,4d54b23..a84f301
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
@@@ -24,6 -24,6 +24,7 @@@ import java.util.List
  import org.apache.helix.HelixManager;
  import org.apache.helix.HelixManagerFactory;
  import org.apache.helix.InstanceType;
++import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.manager.zk.ZKHelixAdmin;
  import org.apache.helix.manager.zk.ZNRecordSerializer;
  import org.apache.helix.manager.zk.ZkClient;
@@@ -63,7 -63,7 +64,8 @@@ public class Worker implements Runnabl
        StateMachineEngine stateMach = _manager.getStateMachineEngine();
        TaskStateModelFactory modelFactory =
            new TaskStateModelFactory(_instanceName, _taskFactory, _taskResultStore);
--      stateMach.registerStateModelFactory(TaskCluster.DEFAULT_STATE_MODEL, modelFactory);
++      stateMach.registerStateModelFactory(StateModelDefId.from(TaskCluster.DEFAULT_STATE_MODEL),
++          modelFactory);
  
        _manager.connect();
  

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
----------------------------------------------------------------------
diff --cc recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
index 3aec20c,0000000..c607b1b
mode 100644,000000..100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
@@@ -1,34 -1,0 +1,36 @@@
 +package org.apache.helix.userdefinedrebalancer;
 +
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
- import org.apache.helix.participant.statemachine.StateModelFactory;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.participant.statemachine.HelixStateModelFactory;
 +
 +/**
 + * This factory allows a participant to get the appropriate state model callbacks for the lock
 + * manager state model. This is used exactly once per participant to get a valid instance of a Lock,
 + * and then the same Lock instance is used for all state transition callbacks.
 + */
- public class LockFactory extends StateModelFactory<Lock> {
++public class LockFactory extends HelixStateModelFactory<Lock> {
 +  @Override
-   public Lock createNewStateModel(String lockName) {
-     return new Lock(lockName);
++  public Lock createNewStateModel(PartitionId partitionId) {
++    // TODO Auto-generated method stub
++    return new Lock(partitionId.stringify());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
----------------------------------------------------------------------
diff --cc recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
index e65113c,0000000..78c906a
mode 100644,000000..100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
@@@ -1,84 -1,0 +1,92 @@@
 +package org.apache.helix.userdefinedrebalancer;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.helix.HelixManager;
- import org.apache.helix.controller.rebalancer.Rebalancer;
- import org.apache.helix.controller.stages.ClusterDataCache;
- import org.apache.helix.controller.stages.CurrentStateOutput;
- import org.apache.helix.model.IdealState;
- import org.apache.helix.model.Partition;
- import org.apache.helix.model.Resource;
++import org.apache.helix.api.Cluster;
++import org.apache.helix.api.State;
++import org.apache.helix.api.id.ParticipantId;
++import org.apache.helix.api.id.PartitionId;
++import org.apache.helix.api.id.StateModelDefId;
++import org.apache.helix.controller.rebalancer.HelixRebalancer;
++import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
++import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
++import org.apache.helix.controller.stages.ResourceCurrentState;
 +import org.apache.helix.model.ResourceAssignment;
 +import org.apache.helix.model.StateModelDefinition;
 +import org.apache.log4j.Logger;
 +
- public class LockManagerRebalancer implements Rebalancer {
++public class LockManagerRebalancer implements HelixRebalancer {
 +  private static final Logger LOG = Logger.getLogger(LockManagerRebalancer.class);
 +
 +  @Override
 +  public void init(HelixManager manager) {
 +    // do nothing; this rebalancer is independent of the manager
 +  }
 +
 +  /**
 +   * This rebalancer is invoked whenever there is a change in the cluster, including when new
 +   * participants join or leave, or the configuration of any participant changes. It is written
 +   * specifically to handle assignment of locks to nodes under the very simple lock-unlock state
 +   * model.
 +   */
 +  @Override
-   public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
++  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
++      Cluster cluster, ResourceCurrentState currentState) {
++    // get a typed context
++    PartitionedRebalancerContext context =
++        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
++
 +    // Initialize an empty mapping of locks to participants
-     ResourceAssignment assignment = new ResourceAssignment(resource.getResourceName());
++    ResourceAssignment assignment = new ResourceAssignment(context.getResourceId());
 +
 +    // Get the list of live participants in the cluster
-     List<String> liveParticipants = new ArrayList<String>(clusterData.getLiveInstances().keySet());
++    List<ParticipantId> liveParticipants =
++        new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
 +
 +    // Get the state model (should be a simple lock/unlock model) and the highest-priority state
-     String stateModelName = currentIdealState.getStateModelDefRef();
-     StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
++    StateModelDefId stateModelDefId = context.getStateModelDefId();
++    StateModelDefinition stateModelDef = cluster.getStateModelMap().get(stateModelDefId);
 +    if (stateModelDef.getStatesPriorityList().size() < 1) {
 +      LOG.error("Invalid state model definition. There should be at least one state.");
 +      return assignment;
 +    }
-     String lockState = stateModelDef.getStatesPriorityList().get(0);
++    State lockState = stateModelDef.getTypedStatesPriorityList().get(0);
 +
 +    // Count the number of participants allowed to lock each lock
-     String stateCount = stateModelDef.getNumInstancesPerState(lockState);
++    String stateCount = stateModelDef.getNumParticipantsPerState(lockState);
 +    int lockHolders = 0;
 +    try {
 +      // a numeric value is a custom-specified number of participants allowed to lock the lock
 +      lockHolders = Integer.parseInt(stateCount);
 +    } catch (NumberFormatException e) {
 +      LOG.error("Invalid state model definition. The lock state does not have a valid count");
 +      return assignment;
 +    }
 +
 +    // Fairly assign the lock state to the participants using a simple mod-based sequential
 +    // assignment. For instance, if each lock can be held by 3 participants, lock 0 would be held
 +    // by participants (0, 1, 2), lock 1 would be held by (1, 2, 3), and so on, wrapping around the
 +    // number of participants as necessary.
 +    // This assumes a simple lock-unlock model where the only state of interest is which nodes have
 +    // acquired each lock.
 +    int i = 0;
-     for (Partition partition : resource.getPartitions()) {
-       Map<String, String> replicaMap = new HashMap<String, String>();
++    for (PartitionId partition : context.getPartitionSet()) {
++      Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
 +      for (int j = i; j < i + lockHolders; j++) {
 +        int participantIndex = j % liveParticipants.size();
-         String participant = liveParticipants.get(participantIndex);
++        ParticipantId participant = liveParticipants.get(participantIndex);
 +        // enforce that a participant can only have one instance of a given lock
 +        if (!replicaMap.containsKey(participant)) {
 +          replicaMap.put(participant, lockState);
 +        }
 +      }
 +      assignment.addReplicaMap(partition, replicaMap);
 +      i++;
 +    }
 +    return assignment;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
----------------------------------------------------------------------
diff --cc recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
index ee363b5,0000000..723f9f2
mode 100644,000000..100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
@@@ -1,79 -1,0 +1,80 @@@
 +package org.apache.helix.userdefinedrebalancer;
 +
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +import java.util.List;
 +
 +import org.apache.helix.HelixManager;
 +import org.apache.helix.HelixManagerFactory;
 +import org.apache.helix.InstanceType;
++import org.apache.helix.api.id.StateModelDefId;
 +import org.apache.helix.manager.zk.ZKHelixAdmin;
 +import org.apache.helix.model.InstanceConfig;
 +
 +public class LockProcess {
 +  private final String clusterName;
 +  private final String zkAddress;
 +  private final String instanceName;
 +  private final String stateModelName;
 +  private HelixManager participantManager;
 +
 +  LockProcess(String clusterName, String zkAddress, String instanceName, String stateModelName) {
 +    this.clusterName = clusterName;
 +    this.zkAddress = zkAddress;
 +    this.instanceName = instanceName;
 +    this.stateModelName = stateModelName;
 +
 +  }
 +
 +  public void start() throws Exception {
 +    System.out.println("STARTING " + instanceName);
 +    configureInstance(instanceName);
 +    participantManager =
 +        HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
 +            zkAddress);
-     participantManager.getStateMachineEngine().registerStateModelFactory(stateModelName,
-         new LockFactory());
++    participantManager.getStateMachineEngine().registerStateModelFactory(
++        StateModelDefId.from(stateModelName), new LockFactory());
 +    participantManager.connect();
 +    System.out.println("STARTED " + instanceName);
 +  }
 +
 +  /**
 +   * Configure the instance, the configuration of each node is available to
 +   * other nodes.
 +   * @param instanceName
 +   */
 +  private void configureInstance(String instanceName) {
 +    ZKHelixAdmin helixAdmin = new ZKHelixAdmin(zkAddress);
 +
 +    List<String> instancesInCluster = helixAdmin.getInstancesInCluster(clusterName);
 +    if (instancesInCluster == null || !instancesInCluster.contains(instanceName)) {
 +      InstanceConfig config = new InstanceConfig(instanceName);
 +      config.setHostName("localhost");
 +      config.setPort("12000");
 +      helixAdmin.addInstance(clusterName, config);
 +    }
 +  }
 +
 +  public void stop() {
 +    if (participantManager != null) {
 +      participantManager.disconnect();
 +    }
 +  }
 +}


Mime
View raw message