helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject git commit: [HELIX-215] Adding new recipe on how to write a custom rebalancer
Date Tue, 03 Sep 2013 16:25:23 GMT
Updated Branches:
  refs/heads/master 1d3c32ed2 -> 19c684174


[HELIX-215] Adding new recipe on how to write a custom rebalancer


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/19c68417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/19c68417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/19c68417

Branch: refs/heads/master
Commit: 19c684174e7d9f6bb84a7feab255a505c6f6ad2c
Parents: 1d3c32e
Author: Kishore Gopalakrishna <g.kishore@gmail.com>
Authored: Tue Sep 3 09:24:56 2013 -0700
Committer: Kishore Gopalakrishna <g.kishore@gmail.com>
Committed: Tue Sep 3 09:24:56 2013 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   9 +
 .../stages/BestPossibleStateCalcStage.java      |   8 +-
 .../java/org/apache/helix/model/IdealState.java |  14 +-
 .../apache/helix/tools/YAMLClusterSetup.java    | 287 +++++++++++++++++++
 pom.xml                                         |  10 +
 recipes/pom.xml                                 |   1 +
 recipes/user-defined-rebalancer/README.md       | 254 ++++++++++++++++
 recipes/user-defined-rebalancer/pom.xml         | 139 +++++++++
 .../src/main/config/log4j.properties            |  31 ++
 .../helix/userdefinedrebalancer/Lock.java       |  48 ++++
 .../userdefinedrebalancer/LockFactory.java      |  34 +++
 .../userdefinedrebalancer/LockManagerDemo.java  | 192 +++++++++++++
 .../LockManagerRebalancer.java                  |  84 ++++++
 .../userdefinedrebalancer/LockProcess.java      |  79 +++++
 .../src/main/resources/lock-manager-config.yaml |  69 +++++
 .../src/test/conf/testng.xml                    |  27 ++
 16 files changed, 1279 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index af04d85..22d1b2c 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -150,6 +150,11 @@ under the License.
       <artifactId>guava</artifactId>
       <version>r09</version>
     </dependency>
+      <dependency>
+        <groupId>org.yaml</groupId>
+        <artifactId>snakeyaml</artifactId>
+        <version>1.12</version>
+      </dependency>
   </dependencies>
   <build>
     <resources>
@@ -213,6 +218,10 @@ under the License.
               <mainClass>org.apache.helix.tools.JmxDumper</mainClass>
               <name>JmxDumper</name>
             </program>
+            <program>
+              <mainClass>org.apache.helix.tools.YAMLClusterSetup</mainClass>
+              <name>yaml-cluster-setup</name>
+            </program>
           </programs>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index e812e16..11955f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -118,9 +118,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       rebalancer.init(manager);
       ResourceAssignment partitionStateAssignment =
           rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
-        output.setState(resourceName, partition, newStateMap);
+      if (partitionStateAssignment != null) {
+        for (Partition partition : resource.getPartitions()) {
+          Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+          output.setState(resourceName, partition, newStateMap);
+        }
       }
     }
     return output;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index e14940a..90a2dff 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -461,13 +461,19 @@ public class IdealState extends HelixProperty {
     return _record.getSimpleField(IdealStateProperty.INSTANCE_GROUP_TAG.toString());
   }
 
+  /**
+   * Update the ideal state mapping from a ResourceAssignment
+   * @param assignment ResourceAssignment result from the rebalancer
+   */
   public void updateFromAssignment(ResourceAssignment assignment) {
     _record.getMapFields().clear();
     _record.getListFields().clear();
-    for (Partition partition : assignment.getMappedPartitions()) {
-      Map<String, String> replicaMap = assignment.getReplicaMap(partition);
-      setInstanceStateMap(partition.getPartitionName(), replicaMap);
-      setPreferenceList(partition.getPartitionName(), new ArrayList<String>(replicaMap.keySet()));
+    if (assignment != null) {
+      for (Partition partition : assignment.getMappedPartitions()) {
+        Map<String, String> replicaMap = assignment.getReplicaMap(partition);
+        setInstanceStateMap(partition.getPartitionName(), replicaMap);
+        setPreferenceList(partition.getPartitionName(), new ArrayList<String>(replicaMap.keySet()));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
new file mode 100644
index 0000000..c7233ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
@@ -0,0 +1,287 @@
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ParticipantConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.ConstraintsConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.StateModelConfig;
+import org.apache.log4j.Logger;
+import org.yaml.snakeyaml.Yaml;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Supports HelixAdmin operations specified by a YAML configuration file defining a cluster,
+ * resources, participants, etc.
+ * See the user-rebalanced-lock-manager recipe for an annotated example file.
+ */
+public class YAMLClusterSetup {
+  private static final Logger LOG = Logger.getLogger(YAMLClusterSetup.class);
+
+  private final String _zkAddress;
+
+  /**
+   * Start the YAML parser for a given zookeeper instance
+   * @param zkAddress
+   */
+  public YAMLClusterSetup(String zkAddress) {
+    _zkAddress = zkAddress;
+  }
+
+  /**
+   * Set up the cluster by parsing a YAML file.
+   * @param input InputStream representing the file
+   * @return ClusterConfig Java wrapper of the configuration file
+   */
+  public YAMLClusterConfig setupCluster(InputStream input) {
+    // parse the YAML
+    Yaml yaml = new Yaml();
+    YAMLClusterConfig cfg = yaml.loadAs(input, YAMLClusterConfig.class);
+
+    // create the cluster
+    HelixAdmin helixAdmin = new ZKHelixAdmin(_zkAddress);
+    if (cfg.clusterName == null) {
+      throw new HelixException("Cluster name is required!");
+    }
+    helixAdmin.addCluster(cfg.clusterName);
+
+    // add each participant
+    if (cfg.participants != null) {
+      for (ParticipantConfig participant : cfg.participants) {
+        helixAdmin.addInstance(cfg.clusterName, getInstanceCfg(participant));
+      }
+    }
+
+    // add each resource
+    if (cfg.resources != null) {
+      for (ResourceConfig resource : cfg.resources) {
+        if (resource.name == null) {
+          throw new HelixException("Resources must be named!");
+        }
+        if (resource.stateModel == null || resource.stateModel.name == null) {
+          throw new HelixException("Resource must specify a named state model!");
+        }
+        // if states is null, assume using a built-in or already-added state model
+        if (resource.stateModel.states != null) {
+          StateModelDefinition stateModelDef =
+              getStateModelDef(resource.stateModel, resource.constraints);
+          helixAdmin.addStateModelDef(cfg.clusterName, resource.stateModel.name, stateModelDef);
+        }
+        int partitions = 1;
+        int replicas = 1;
+        if (resource.partitions != null) {
+          if (resource.partitions.containsKey("count")) {
+            partitions = resource.partitions.get("count");
+          }
+          if (resource.partitions.containsKey("replicas")) {
+            replicas = resource.partitions.get("replicas");
+          }
+        }
+
+        if (resource.rebalancer == null || !resource.rebalancer.containsKey("mode")) {
+          throw new HelixException("Rebalance mode is required!");
+        }
+        helixAdmin.addResource(cfg.clusterName, resource.name, partitions,
+            resource.stateModel.name, resource.rebalancer.get("mode"));
+        // user-defined rebalancer
+        if (resource.rebalancer.containsKey("class")
+            && resource.rebalancer.get("mode").equals(RebalanceMode.USER_DEFINED.toString())) {
+          IdealState idealState = helixAdmin.getResourceIdealState(cfg.clusterName, resource.name);
+          idealState.setRebalancerClassName(resource.rebalancer.get("class"));
+          helixAdmin.setResourceIdealState(cfg.clusterName, resource.name, idealState);
+        }
+        helixAdmin.rebalance(cfg.clusterName, resource.name, replicas);
+      }
+    }
+    return cfg;
+  }
+
+  private static InstanceConfig getInstanceCfg(ParticipantConfig participant) {
+    if (participant == null || participant.name == null || participant.host == null
+        || participant.port == null) {
+      throw new HelixException("Participant must have a specified name, host, and port!");
+    }
+    InstanceConfig instanceCfg = new InstanceConfig(participant.name);
+    instanceCfg.setHostName(participant.host);
+    instanceCfg.setPort(participant.port.toString());
+    return instanceCfg;
+  }
+
+  private static StateModelDefinition getStateModelDef(StateModelConfig stateModel,
+      ConstraintsConfig constraints) {
+    // Use a builder to define the state model
+    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(stateModel.name);
+    if (stateModel.states == null || stateModel.states.size() == 0) {
+      throw new HelixException("List of states are required in a state model!");
+    }
+    Set<String> stateSet = new HashSet<String>(stateModel.states);
+    if (stateModel.initialState == null) {
+      throw new HelixException("Initial state is required in a state model!");
+    } else if (!stateSet.contains(stateModel.initialState)) {
+      throw new HelixException("Initial state is not a valid state");
+    }
+    builder.initialState(stateModel.initialState);
+
+    // Build a helper for state priorities
+    Map<String, Integer> statePriorities = new HashMap<String, Integer>();
+    if (constraints != null && constraints.state != null && constraints.state.priorityList != null) {
+      int statePriority = 0;
+      for (String state : constraints.state.priorityList) {
+        if (!stateSet.contains(state)) {
+          throw new HelixException("State " + state
+              + " in the state priority list is not in the state list!");
+        }
+        statePriorities.put(state, statePriority);
+        statePriority++;
+      }
+    }
+
+    // Add states, set state priorities
+    for (String state : stateModel.states) {
+      if (statePriorities.containsKey(state)) {
+        builder.addState(state, statePriorities.get(state));
+      } else {
+        builder.addState(state);
+      }
+    }
+
+    // Set state counts
+    for (Map<String, String> counts : constraints.state.counts) {
+      String state = counts.get("name");
+      if (!stateSet.contains(state)) {
+        throw new HelixException("State " + state + " has a count, but not in the state list!");
+      }
+      builder.dynamicUpperBound(state, counts.get("count"));
+    }
+
+    // Build a helper for transition priorities
+    Map<String, Integer> transitionPriorities = new HashMap<String, Integer>();
+    if (constraints != null && constraints.transition != null
+        && constraints.transition.priorityList != null) {
+      int transitionPriority = 0;
+      for (String transition : constraints.transition.priorityList) {
+        transitionPriorities.put(transition, transitionPriority);
+        transitionPriority++;
+      }
+    }
+
+    // Add the transitions
+    if (stateModel.transitions == null || stateModel.transitions.size() == 0) {
+      throw new HelixException("Transitions are required!");
+    }
+    for (Map<String, String> transitions : stateModel.transitions) {
+      String name = transitions.get("name");
+      String from = transitions.get("from");
+      String to = transitions.get("to");
+      if (name == null || from == null || to == null) {
+        throw new HelixException("All transitions must have a name, a from state, and a to state");
+      }
+      if (transitionPriorities.containsKey(name)) {
+        builder.addTransition(from, to, transitionPriorities.get(name));
+      } else {
+        builder.addTransition(from, to);
+      }
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Java wrapper for the YAML input file
+   */
+  public static class YAMLClusterConfig {
+    public String clusterName;
+    public List<ResourceConfig> resources;
+    public List<ParticipantConfig> participants;
+
+    public static class ResourceConfig {
+      public String name;
+      public Map<String, String> rebalancer;
+      public Map<String, Integer> partitions;
+      public StateModelConfig stateModel;
+      public ConstraintsConfig constraints;
+
+      public static class StateModelConfig {
+        public String name;
+        public List<String> states;
+        public List<Map<String, String>> transitions;
+        public String initialState;
+      }
+
+      public static class ConstraintsConfig {
+        public StateConstraintsConfig state;
+        public TransitionConstraintsConfig transition;
+
+        public static class StateConstraintsConfig {
+          public List<Map<String, String>> counts;
+          public List<String> priorityList;
+        }
+
+        public static class TransitionConstraintsConfig {
+          public List<String> priorityList;
+        }
+      }
+    }
+
+    public static class ParticipantConfig {
+      public String name;
+      public String host;
+      public Integer port;
+    }
+  }
+
+  /**
+   * Start a cluster defined by a YAML file
+   * @param args zkAddr, yamlFile
+   */
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      LOG.error("USAGE: YAMLClusterSetup zkAddr yamlFile");
+      return;
+    }
+    String zkAddress = args[0];
+    String yamlFile = args[1];
+
+    InputStream input;
+    try {
+      input = new FileInputStream(new File(yamlFile));
+    } catch (FileNotFoundException e) {
+      LOG.error("Could not open " + yamlFile);
+      return;
+    }
+    new YAMLClusterSetup(zkAddress).setupCluster(input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ee6f573..6840410 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,6 +164,11 @@ under the License.
         <enabled>false</enabled>
       </snapshots>
     </repository>
+    <repository>
+      <id>Sonatype-public</id>
+      <name>SnakeYAML repository</name>
+      <url>http://oss.sonatype.org/content/groups/public/</url>
+    </repository>
   </repositories>
 
  
@@ -285,6 +290,11 @@ under the License.
         <artifactId>testng</artifactId>
         <version>6.0.1</version>
       </dependency>
+      <dependency>
+        <groupId>org.yaml</groupId>
+        <artifactId>snakeyaml</artifactId>
+        <version>1.12</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/pom.xml b/recipes/pom.xml
index d0a93b1..3667650 100644
--- a/recipes/pom.xml
+++ b/recipes/pom.xml
@@ -33,6 +33,7 @@ under the License.
     <module>rabbitmq-consumer-group</module>
     <module>rsync-replicated-file-system</module>
     <module>distributed-lock-manager</module>
+    <module>user-defined-rebalancer</module>
     <module>task-execution</module>
     <module>service-discovery</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/README.md
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/README.md b/recipes/user-defined-rebalancer/README.md
new file mode 100644
index 0000000..3dca51c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/README.md
@@ -0,0 +1,254 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Distributed lock manager with a user-defined rebalancer and YAML configuration
+------------------------------------------------------------------------------
+This recipe is a second take on the distributed lock manager example with two key differences
+  * Instead of specifying the cluster using the HelixAdmin Java API, a YAML file indicates the cluster, its resources, and its participants. This is a simplified way to bootstrap cluster creation with a compact, logical hierarchy.
+  * The rebalancing process (i.e. the algorithm that uses the cluster state to determine an assignment of locks to participants) is specified in a class defined by the recipe itself, completely independent of Helix.
+
+For additional background and motivation, see the distributed-lock-manager recipe.
+
+### YAML Cluster Setup
+The YAML configuration below specifies a state model for a lock in which it can be locked and unlocked. At most one participant can hold the lock at any time, and there are 12 locks to distribute across 4 participants.
+
+```
+clusterName: lock-manager-custom-rebalancer # unique name for the cluster
+resources:
+  - name: lock-group # unique resource name
+    rebalancer: # we will provide our own rebalancer
+      mode: USER_DEFINED
+      class: org.apache.helix.userdefinedrebalancer.LockManagerRebalancer
+    partitions:
+      count: 12 # number of locks
+      replicas: 1 # number of simultaneous holders for each lock
+    stateModel:
+      name: lock-unlock # unique model name
+      states: [LOCKED, RELEASED, DROPPED] # the list of possible states
+      transitions: # the list of possible transitions
+        - name: Unlock
+          from: LOCKED
+          to: RELEASED
+        - name: Lock
+          from: RELEASED
+          to: LOCKED
+        - name: DropLock
+          from: LOCKED
+          to: DROPPED
+        - name: DropUnlock
+          from: RELEASED
+          to: DROPPED
+        - name: Undrop
+          from: DROPPED
+          to: RELEASED
+      initialState: RELEASED
+    constraints:
+      state:
+        counts: # maximum number of replicas of a partition that can be in each state
+          - name: LOCKED
+            count: "1"
+          - name: RELEASED
+            count: "-1"
+          - name: DROPPED
+            count: "-1"
+        priorityList: [LOCKED, RELEASED, DROPPED] # states in order of priority
+      transition: # transitions priority to enforce order that transitions occur
+        priorityList: [Unlock, Lock, Undrop, DropUnlock, DropLock]
+participants: # list of nodes that can acquire locks
+  - name: localhost_12001
+    host: localhost
+    port: 12001
+  - name: localhost_12002
+    host: localhost
+    port: 12002
+  - name: localhost_12003
+    host: localhost
+    port: 12003
+```
+
+### User-Defined Rebalancer
+The implementation of the Rebalancer interface is quite simple. It assumes a Lock/Unlock model where the lock state has highest priority. It uses a mod-based approach to fairly assign locks to participants so that no participant holds more than one instance of a lock, and each lock is only assigned to as many participants as can hold the same lock simultaneously. In the configuration above, only one participant can hold a given lock in the locked state.
+
+The result is a ResourceMapping, which maps each lock to its holder and its lock state. In Helix terminology, the lock manager is the resource, a lock is a partition, its holder is a participant, and the lock state is the current state of the lock based on one of the pre-defined states in the state model.
+
+```
+@Override
+public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+    CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+  // Initialize an empty mapping of locks to participants
+  ResourceAssignment assignment = new ResourceAssignment(resource.getResourceName());
+
+  // Get the list of live participants in the cluster
+  List<String> liveParticipants = new ArrayList<String>(clusterData.getLiveInstances().keySet());
+
+  // Get the state model (should be a simple lock/unlock model) and the highest-priority state
+  String stateModelName = currentIdealState.getStateModelDefRef();
+  StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+  if (stateModelDef.getStatesPriorityList().size() < 1) {
+    LOG.error("Invalid state model definition. There should be at least one state.");
+    return assignment;
+  }
+  String lockState = stateModelDef.getStatesPriorityList().get(0);
+
+  // Count the number of participants allowed to lock each lock
+  String stateCount = stateModelDef.getNumInstancesPerState(lockState);
+  int lockHolders = 0;
+  try {
+    // a numeric value is a custom-specified number of participants allowed to lock the lock
+    lockHolders = Integer.parseInt(stateCount);
+  } catch (NumberFormatException e) {
+    LOG.error("Invalid state model definition. The lock state does not have a valid count");
+    return assignment;
+  }
+
+  // Fairly assign the lock state to the participants using a simple mod-based sequential
+  // assignment. For instance, if each lock can be held by 3 participants, lock 0 would be held
+  // by participants (0, 1, 2), lock 1 would be held by (1, 2, 3), and so on, wrapping around the
+  // number of participants as necessary.
+  // This assumes a simple lock-unlock model where the only state of interest is which nodes have
+  // acquired each lock.
+  int i = 0;
+  for (Partition partition : resource.getPartitions()) {
+    Map<String, String> replicaMap = new HashMap<String, String>();
+    for (int j = i; j < i + lockHolders; j++) {
+      int participantIndex = j % liveParticipants.size();
+      String participant = liveParticipants.get(participantIndex);
+      // enforce that a participant can only have one instance of a given lock
+      if (!replicaMap.containsKey(participant)) {
+        replicaMap.put(participant, lockState);
+      }
+    }
+    assignment.addReplicaMap(partition, replicaMap);
+    i++;
+  }
+  return assignment;
+}
+```
+----------------------------------------------------------------------------------------
+
+#### In Action
+
+##### Specifying a Lock StateModel
+In our configuration file, we indicated a special state model with two key states: LOCKED and RELEASED. Thus, we need to provide for the participant a subclass of StateModel that can respond to transitions between those states.
+
+```
+public class Lock extends StateModel {
+  private String lockName;
+
+  public Lock(String lockName) {
+    this.lockName = lockName;
+  }
+
+  @Transition(from = "RELEASED", to = "LOCKED")
+  public void lock(Message m, NotificationContext context) {
+    System.out.println(context.getManager().getInstanceName() + " acquired lock:" + lockName);
+  }
+
+  @Transition(from = "LOCKED", to = "RELEASED")
+  public void release(Message m, NotificationContext context) {
+    System.out.println(context.getManager().getInstanceName() + " releasing lock:" + lockName);
+  }
+}
+```
+
+##### Loading the configuration file
+We include a YAML file parser that will set up the cluster according to the specifications of the file. Here is the code that this example uses to set up the cluster:
+
+```
+YAMLClusterSetup setup = new YAMLClusterSetup(zkAddress);
+InputStream input =
+    Thread.currentThread().getContextClassLoader()
+        .getResourceAsStream("lock-manager-config.yaml");
+YAMLClusterSetup.YAMLClusterConfig config = setup.setupCluster(input);
+```
+At this point, the cluster is set up and the configuration is persisted on Zookeeper. The config variable contains a snapshot of this configuration for further access.
+
+##### Building 
+```
+git clone https://git-wip-us.apache.org/repos/asf/incubator-helix.git
+cd incubator-helix
+mvn clean install package -DskipTests
+cd recipes/user-rebalanced-lock-manager/target/user-rebalanced-lock-manager-pkg/bin
+chmod +x *
+./lock-manager-demo
+```
+
+##### Output
+
+```
+./lock-manager-demo 
+STARTING localhost_12002
+STARTING localhost_12001
+STARTING localhost_12003
+STARTED localhost_12001
+STARTED localhost_12003
+STARTED localhost_12002
+localhost_12003 acquired lock:lock-group_4
+localhost_12002 acquired lock:lock-group_8
+localhost_12001 acquired lock:lock-group_10
+localhost_12001 acquired lock:lock-group_3
+localhost_12001 acquired lock:lock-group_6
+localhost_12003 acquired lock:lock-group_0
+localhost_12002 acquired lock:lock-group_5
+localhost_12001 acquired lock:lock-group_9
+localhost_12002 acquired lock:lock-group_2
+localhost_12003 acquired lock:lock-group_7
+localhost_12003 acquired lock:lock-group_11
+localhost_12002 acquired lock:lock-group_1
+lockName  acquired By
+======================================
+lock-group_0  localhost_12003
+lock-group_1  localhost_12002
+lock-group_10 localhost_12001
+lock-group_11 localhost_12003
+lock-group_2  localhost_12002
+lock-group_3  localhost_12001
+lock-group_4  localhost_12003
+lock-group_5  localhost_12002
+lock-group_6  localhost_12001
+lock-group_7  localhost_12003
+lock-group_8  localhost_12002
+lock-group_9  localhost_12001
+Stopping the first participant
+localhost_12001 Interrupted
+localhost_12002 acquired lock:lock-group_3
+localhost_12003 acquired lock:lock-group_6
+localhost_12003 acquired lock:lock-group_10
+localhost_12002 acquired lock:lock-group_9
+lockName  acquired By
+======================================
+lock-group_0  localhost_12003
+lock-group_1  localhost_12002
+lock-group_10 localhost_12003
+lock-group_11 localhost_12003
+lock-group_2  localhost_12002
+lock-group_3  localhost_12002
+lock-group_4  localhost_12003
+lock-group_5  localhost_12002
+lock-group_6  localhost_12003
+lock-group_7  localhost_12003
+lock-group_8  localhost_12002
+lock-group_9  localhost_12002
+```
+
+----------------------------------------------------------------------------------------
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/pom.xml b/recipes/user-defined-rebalancer/pom.xml
new file mode 100644
index 0000000..ebd972c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.helix.recipes</groupId>
+    <artifactId>recipes</artifactId>
+    <version>0.6.2-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>user-defined-rebalancer</artifactId>
+  <packaging>bundle</packaging>
+  <name>Apache Helix :: Recipes :: user-defined-rebalancer</name>
+
+  <properties>
+    <osgi.import>
+      org.apache.helix*,
+      org.apache.log4j,
+      *
+    </osgi.import>
+    <osgi.export>org.apache.helix.userdefinedrebalancer*;version="${project.version};-noimport:=true</osgi.export>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <version>6.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.mail</groupId>
+          <artifactId>mail</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+  <build>
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>appassembler-maven-plugin</artifactId>
+          <configuration>
+            <!-- Set the target configuration directory to be used in the bin scripts -->
+            <!-- <configurationDirectory>conf</configurationDirectory> -->
+            <!-- Copy the contents from "/src/main/config" to the target configuration
+              directory in the assembled application -->
+            <!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+            <!-- Include the target configuration directory in the beginning of
+              the classpath declaration in the bin scripts -->
+            <includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+            <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+            <!-- Extra JVM arguments that will be included in the bin scripts -->
+            <extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+            <!-- Generate bin scripts for windows and unix pr default -->
+            <platforms>
+              <platform>windows</platform>
+              <platform>unix</platform>
+            </platforms>
+          </configuration>
+          <executions>
+            <execution>
+              <phase>package</phase>
+              <goals>
+                <goal>assemble</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.rat</groupId>
+          <artifactId>apache-rat-plugin</artifactId>
+            <configuration>
+              <excludes combine.children="append">
+              </excludes>
+            </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <programs>
+            <program>
+              <mainClass>org.apache.helix.userdefinedrebalancer.LockManagerDemo</mainClass>
+              <name>lock-manager-demo</name>
+            </program>
+          </programs>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/config/log4j.properties b/recipes/user-defined-rebalancer/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
new file mode 100644
index 0000000..ceba1ed
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
@@ -0,0 +1,48 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(initialState = "RELEASED", states = {
+    "RELEASED", "LOCKED"
+})
+public class Lock extends StateModel {
+  private String lockName;
+
+  public Lock(String lockName) {
+    this.lockName = lockName;
+  }
+
+  @Transition(from = "RELEASED", to = "LOCKED")
+  public void lock(Message m, NotificationContext context) {
+    System.out.println(context.getManager().getInstanceName() + " acquired lock:" + lockName);
+  }
+
+  @Transition(from = "LOCKED", to = "RELEASED")
+  public void release(Message m, NotificationContext context) {
+    System.out.println(context.getManager().getInstanceName() + " releasing lock:" + lockName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
new file mode 100644
index 0000000..3aec20c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
@@ -0,0 +1,34 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ * This factory allows a participant to get the appropriate state model callbacks for the lock
+ * manager state model. This is used exactly once per participant to get a valid instance of a Lock,
+ * and then the same Lock instance is used for all state transition callbacks.
+ */
+public class LockFactory extends StateModelFactory<Lock> {
+  @Override
+  public Lock createNewStateModel(String lockName) {
+    return new Lock(lockName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java
new file mode 100644
index 0000000..727c5b7
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java
@@ -0,0 +1,192 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.YAMLClusterSetup;
+import org.apache.log4j.Logger;
+
+public class LockManagerDemo {
+  private static final Logger LOG = Logger.getLogger(LockManagerDemo.class);
+
+  /**
+   * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    final String zkAddress = "localhost:2199";
+
+    // default participant parameters in case the config does not specify them
+    int numInstances = 3;
+    boolean instancesSpecified = false;
+    Thread[] processArray = new Thread[numInstances];
+
+    // HelixManager for setting up the controller
+    HelixManager controllerManager = null;
+
+    // Name of the lock group resource (specified by the config file)
+    String lockGroupName = null;
+    try {
+      startLocalZookeeper(2199);
+      YAMLClusterSetup setup = new YAMLClusterSetup(zkAddress);
+      InputStream input =
+          Thread.currentThread().getContextClassLoader()
+              .getResourceAsStream("lock-manager-config.yaml");
+      final YAMLClusterSetup.YAMLClusterConfig config = setup.setupCluster(input);
+      if (config == null) {
+        LOG.error("Invalid YAML configuration");
+        return;
+      }
+      if (config.resources == null || config.resources.isEmpty()) {
+        LOG.error("Need to specify a resource!");
+        return;
+      }
+
+      // save resource name
+      lockGroupName = config.resources.get(0).name;
+
+      // save participants if specified
+      if (config.participants != null && config.participants.size() > 0) {
+        numInstances = config.participants.size();
+        instancesSpecified = true;
+        processArray = new Thread[numInstances];
+      }
+
+      // run each participant
+      for (int i = 0; i < numInstances; i++) {
+        String participantName;
+        if (instancesSpecified) {
+          participantName = config.participants.get(i).name;
+        } else {
+          participantName = "localhost_" + (12000 + i);
+        }
+        final String instanceName = participantName;
+        processArray[i] = new Thread(new Runnable() {
+
+          @Override
+          public void run() {
+            LockProcess lockProcess = null;
+
+            try {
+              lockProcess =
+                  new LockProcess(config.clusterName, zkAddress, instanceName,
+                      config.resources.get(0).stateModel.name);
+              lockProcess.start();
+              Thread.currentThread().join();
+            } catch (InterruptedException e) {
+              System.out.println(instanceName + " Interrupted");
+              if (lockProcess != null) {
+                lockProcess.stop();
+              }
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }
+
+        });
+        processArray[i].start();
+      }
+      Thread.sleep(3000);
+
+      // start the controller
+      controllerManager =
+          HelixControllerMain.startHelixController(zkAddress, config.clusterName, "controller",
+              HelixControllerMain.STANDALONE);
+      Thread.sleep(5000);
+
+      // HelixAdmin for querying cluster state
+      HelixAdmin admin = new ZKHelixAdmin(zkAddress);
+
+      printStatus(admin, config.clusterName, lockGroupName);
+
+      // stop one participant
+      System.out.println("Stopping the first participant");
+      processArray[0].interrupt();
+      Thread.sleep(3000);
+      printStatus(admin, config.clusterName, lockGroupName);
+      Thread.currentThread().join();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (controllerManager != null) {
+        controllerManager.disconnect();
+      }
+      for (Thread process : processArray) {
+        if (process != null) {
+          process.interrupt();
+        }
+      }
+    }
+  }
+
+  private static void printStatus(HelixAdmin admin, String cluster, String resource) {
+    ExternalView externalView = admin.getResourceExternalView(cluster, resource);
+    TreeSet<String> treeSet = new TreeSet<String>(externalView.getPartitionSet());
+    System.out.println("lockName" + "\t" + "acquired By");
+    System.out.println("======================================");
+    for (String lockName : treeSet) {
+      Map<String, String> stateMap = externalView.getStateMap(lockName);
+      String acquiredBy = null;
+      if (stateMap != null) {
+        for (String instanceName : stateMap.keySet()) {
+          if ("LOCKED".equals(stateMap.get(instanceName))) {
+            acquiredBy = instanceName;
+            break;
+          }
+        }
+      }
+      System.out.println(lockName + "\t" + ((acquiredBy != null) ? acquiredBy : "NONE"));
+    }
+  }
+
+  private static void startLocalZookeeper(int port) throws Exception {
+    ZkServer server = null;
+    String baseDir = "/tmp/IntegrationTest/";
+    final String dataDir = baseDir + "zk/dataDir";
+    final String logDir = baseDir + "/tmp/logDir";
+    FileUtils.deleteDirectory(new File(dataDir));
+    FileUtils.deleteDirectory(new File(logDir));
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+      @Override
+      public void createDefaultNameSpace(ZkClient zkClient) {
+
+      }
+    };
+    int zkPort = 2199;
+    server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+    server.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
new file mode 100644
index 0000000..e65113c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
@@ -0,0 +1,84 @@
+package org.apache.helix.userdefinedrebalancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class LockManagerRebalancer implements Rebalancer {
+  private static final Logger LOG = Logger.getLogger(LockManagerRebalancer.class);
+
+  @Override
+  public void init(HelixManager manager) {
+    // do nothing; this rebalancer is independent of the manager
+  }
+
+  /**
+   * This rebalancer is invoked whenever there is a change in the cluster, including when new
+   * participants join or leave, or the configuration of any participant changes. It is written
+   * specifically to handle assignment of locks to nodes under the very simple lock-unlock state
+   * model.
+   */
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    // Initialize an empty mapping of locks to participants
+    ResourceAssignment assignment = new ResourceAssignment(resource.getResourceName());
+
+    // Get the list of live participants in the cluster
+    List<String> liveParticipants = new ArrayList<String>(clusterData.getLiveInstances().keySet());
+
+    // Get the state model (should be a simple lock/unlock model) and the highest-priority state
+    String stateModelName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+    if (stateModelDef.getStatesPriorityList().size() < 1) {
+      LOG.error("Invalid state model definition. There should be at least one state.");
+      return assignment;
+    }
+    String lockState = stateModelDef.getStatesPriorityList().get(0);
+
+    // Count the number of participants allowed to lock each lock
+    String stateCount = stateModelDef.getNumInstancesPerState(lockState);
+    int lockHolders = 0;
+    try {
+      // a numeric value is a custom-specified number of participants allowed to lock the lock
+      lockHolders = Integer.parseInt(stateCount);
+    } catch (NumberFormatException e) {
+      LOG.error("Invalid state model definition. The lock state does not have a valid count");
+      return assignment;
+    }
+
+    // Fairly assign the lock state to the participants using a simple mod-based sequential
+    // assignment. For instance, if each lock can be held by 3 participants, lock 0 would be held
+    // by participants (0, 1, 2), lock 1 would be held by (1, 2, 3), and so on, wrapping around the
+    // number of participants as necessary.
+    // This assumes a simple lock-unlock model where the only state of interest is which nodes have
+    // acquired each lock.
+    int i = 0;
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> replicaMap = new HashMap<String, String>();
+      for (int j = i; j < i + lockHolders; j++) {
+        int participantIndex = j % liveParticipants.size();
+        String participant = liveParticipants.get(participantIndex);
+        // enforce that a participant can only have one instance of a given lock
+        if (!replicaMap.containsKey(participant)) {
+          replicaMap.put(participant, lockState);
+        }
+      }
+      assignment.addReplicaMap(partition, replicaMap);
+      i++;
+    }
+    return assignment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
new file mode 100644
index 0000000..ee363b5
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
@@ -0,0 +1,79 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+
+public class LockProcess {
+  private final String clusterName;
+  private final String zkAddress;
+  private final String instanceName;
+  private final String stateModelName;
+  private HelixManager participantManager;
+
+  LockProcess(String clusterName, String zkAddress, String instanceName, String stateModelName) {
+    this.clusterName = clusterName;
+    this.zkAddress = zkAddress;
+    this.instanceName = instanceName;
+    this.stateModelName = stateModelName;
+
+  }
+
+  public void start() throws Exception {
+    System.out.println("STARTING " + instanceName);
+    configureInstance(instanceName);
+    participantManager =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+            zkAddress);
+    participantManager.getStateMachineEngine().registerStateModelFactory(stateModelName,
+        new LockFactory());
+    participantManager.connect();
+    System.out.println("STARTED " + instanceName);
+  }
+
+  /**
+   * Configure the instance, the configuration of each node is available to
+   * other nodes.
+   * @param instanceName
+   */
+  private void configureInstance(String instanceName) {
+    ZKHelixAdmin helixAdmin = new ZKHelixAdmin(zkAddress);
+
+    List<String> instancesInCluster = helixAdmin.getInstancesInCluster(clusterName);
+    if (instancesInCluster == null || !instancesInCluster.contains(instanceName)) {
+      InstanceConfig config = new InstanceConfig(instanceName);
+      config.setHostName("localhost");
+      config.setPort("12000");
+      helixAdmin.addInstance(clusterName, config);
+    }
+  }
+
+  public void stop() {
+    if (participantManager != null) {
+      participantManager.disconnect();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml b/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml
new file mode 100644
index 0000000..b312877
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+clusterName: lock-manager-custom-rebalancer # unique name for the cluster
+resources:
+  - name: lock-group # unique resource name
+    rebalancer: # we will provide our own rebalancer
+      mode: USER_DEFINED
+      class: org.apache.helix.userdefinedrebalancer.LockManagerRebalancer
+    partitions:
+      count: 12 # number of locks
+      replicas: 1 # number of simultaneous holders for each lock
+    stateModel:
+      name: lock-unlock # unique model name
+      states: [LOCKED, RELEASED, DROPPED] # the list of possible states
+      transitions: # the list of possible transitions
+        - name: Unlock
+          from: LOCKED
+          to: RELEASED
+        - name: Lock
+          from: RELEASED
+          to: LOCKED
+        - name: DropLock
+          from: LOCKED
+          to: DROPPED
+        - name: DropUnlock
+          from: RELEASED
+          to: DROPPED
+        - name: Undrop
+          from: DROPPED
+          to: RELEASED
+      initialState: RELEASED
+    constraints:
+      state:
+        counts: # maximum number of replicas of a partition that can be in each state
+          - name: LOCKED
+            count: "1"
+          - name: RELEASED
+            count: "-1"
+          - name: DROPPED
+            count: "-1"
+        priorityList: [LOCKED, RELEASED, DROPPED] # states in order of priority
+      transition: # transitions priority to enforce order that transitions occur
+        priorityList: [Unlock, Lock, Undrop, DropUnlock, DropLock]
+participants: # list of nodes that can acquire locks
+  - name: localhost_12001
+    host: localhost
+    port: 12001
+  - name: localhost_12002
+    host: localhost
+    port: 12002
+  - name: localhost_12003
+    host: localhost
+    port: 12003
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/test/conf/testng.xml b/recipes/user-defined-rebalancer/src/test/conf/testng.xml
new file mode 100644
index 0000000..58f0803
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+  <test name="Test" preserve-order="false">
+    <packages>
+      <package name="org.apache.helix"/>
+    </packages>
+  </test>
+</suite>


Mime
View raw message