helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/4] helix git commit: Refactoring cluster.py: Changing from module to class based functions. Abstracting Cluster object. Adding ZKCluster object.
Date Thu, 19 Feb 2015 05:13:44 GMT
Repository: helix
Updated Branches:
  refs/heads/master 01222c4f6 -> 80a4a13fd


Refactoring cluster.py:
Changing from module to class based functions.
Abstracting Cluster object.
Adding ZKCluster object.

functions.py:
Put all functions inside a class.
Switched to using a class variable for host selection.

zkfunctions.py:
Adding support for direct zookeeper access.

statemodeldefs.py:
Constants for state model definitions.

test/test_helix.py:
Unit tests for both Rest object and zookeeper object.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a714f002
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a714f002
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a714f002

Branch: refs/heads/master
Commit: a714f002cd2d9dd98bd86723da787c94f0d70ee7
Parents: 01222c4
Author: Casey Miller <camiller@linkedin.com>
Authored: Fri Feb 13 19:43:24 2015 -0800
Committer: Casey Miller <camiller@linkedin.com>
Committed: Fri Feb 13 19:43:24 2015 -0800

----------------------------------------------------------------------
 contributors/py-helix-admin/helix/cluster.py    |  91 +-
 contributors/py-helix-admin/helix/functions.py  | 925 +++++++++----------
 .../py-helix-admin/helix/statemodeldefs.py      |  39 +
 .../py-helix-admin/helix/test/test_helix.py     |  60 ++
 .../py-helix-admin/helix/zkfunctions.py         | 522 +++++++++++
 5 files changed, 1111 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/cluster.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/cluster.py b/contributors/py-helix-admin/helix/cluster.py
index 7db7014..175bff8 100644
--- a/contributors/py-helix-admin/helix/cluster.py
+++ b/contributors/py-helix-admin/helix/cluster.py
@@ -23,17 +23,21 @@ from partition import Partition
 from resourcegroup import ResourceGroup
 
 from helixexceptions import HelixException
-import functions
+from functions import RestHelixFunctions
+try:
+  from zkfunctions import ZookeeperHelixFunctions
+  zookeeper_ok = True
+except ImportError:
+  zookeeper_ok = False
 
 
-class Cluster(object):
+class BaseCluster(object):
     """Basic model of a cluster, holds participants, partitions, slices,
     external view, ideal state, resource groups"""
     ver = (1, 0)
 
-    def __init__(self, host, cluster):
-        super(Cluster, self).__init__()
-        self.host = host
+    def __init__(self, cluster):
+        super(BaseCluster, self).__init__()
         self.cluster = cluster
 
         # dynamically loaded data below
@@ -48,15 +52,13 @@ class Cluster(object):
                                            self.cluster)
 
     def __repr__(self):
-        return "{0}({1}, {2})".format(self.__class__.__name__, self.cluster,
-                                      self.host)
+        return "{0}({1}, {2})".format(self.__class__.__name__, self.cluster)
 
     def load_resources(self):
         """queries helix for resource groups and loades them into model"""
         try:
-            for cur_resource in functions.get_resource_groups(self.host,
-                                                              self.cluster):
-                data = functions.get_resource_group(self.host, self.cluster,
+            for cur_resource in self.functions.get_resource_groups(self.cluster):
+                data = self.functions.get_resource_group(self.cluster,
                                                     cur_resource)
                 name = data["id"]
                 count = data["simpleFields"]["NUM_PARTITIONS"]
@@ -94,7 +96,7 @@ class Cluster(object):
 
     def _cluster_exists(self):
         """verify cluster exists in helix"""
-        if self.cluster in functions.get_clusters(self.host):
+        if self.cluster in self.functions.get_clusters():
             return True
         return False
 
@@ -103,7 +105,7 @@ class Cluster(object):
         self._participants = {}
 
         try:
-            instances = functions.get_instances(self.host, self.cluster)
+            instances = self.functions.get_instances(self.cluster)
             for instance in instances:
                 ident = instance["id"]
                 enabled = instance["simpleFields"]["HELIX_ENABLED"]
@@ -129,7 +131,7 @@ class Cluster(object):
         """query partitions from helix and load into model"""
         self._partitions = {}
         for resource in self.resources:
-            newstate = functions.get_ideal_state(self.host, self.cluster,
+            newstate = self.functions.get_ideal_state(self.cluster,
                                                  resource)
             self._partitions[resource] = {}
             if newstate:
@@ -152,7 +154,7 @@ class Cluster(object):
         self._ideal_state = {}
         for resource in self.resources:
             self._ideal_state[resource] = \
-                functions.get_ideal_state(self.host, self.cluster, resource)
+                self.functions.get_ideal_state(self.cluster, resource)
 
     @property
     def ideal_state(self):
@@ -171,7 +173,7 @@ class Cluster(object):
         self._external_view = {}
         for resource in self.resources:
             self._external_view[resource] = \
-                functions.get_external_view(self.host, self.cluster, resource)
+                self.functions.get_external_view(self.cluster, resource)
 
     @property
     def external_view(self):
@@ -187,18 +189,18 @@ class Cluster(object):
 
     def get_config(self, config):
         """ get requested config from helix"""
-        return functions.get_config(self.host, self.cluster, config)
+        return self.functions.get_config(self.cluster, config)
 
     def set_cluster_config(self, config):
         """ set given configs in helix"""
-        return functions.set_config(self.host, self.cluster, config)
+        return self.functions.set_config(self.cluster, config)
 
     def set_resource_config(self, config, resource):
         """ set given configs in helix"""
         rname = resource
         if isinstance(resource, ResourceGroup):
             rname = resource.name
-        return functions.set_config(self.host, self.cluster, config,
+        return self.functions.set_config(self.cluster, config,
                                     resource=rname)
 
     def set_participant_config(self, config, participant):
@@ -206,34 +208,34 @@ class Cluster(object):
         if isinstance(participant, Participant):
             pname = participant.ident
         """ set given configs in helix"""
-        return functions.set_config(self.host, self.cluster, config,
+        return self.functions.set_config(self.cluster, config,
                                     participant=pname)
 
     def activate_cluster(self, grand, enabled=True):
         """activate this cluster with the specified grand cluster"""
-        return functions.activate_cluster(self.host, self.cluster, grand,
+        return self.functions.activate_cluster(self.cluster, grand,
                                           enabled)
 
     def deactivate_cluster(self, grand):
         """deactivate this cluster against the given grandcluster"""
-        return functions.deactivate_cluster(self.host, self.cluster, grand)
+        return self.functions.deactivate_cluster(self.cluster, grand)
 
     def add_cluster(self):
         """add cluster to helix"""
-        return functions.add_cluster(self.host, self.cluster)
+        return self.functions.add_cluster(self.cluster)
 
     def add_instance(self, instances, port):
         """add instance to cluster"""
-        return functions.add_instance(self.host, self.cluster, instances, port)
+        return self.functions.add_instance(self.cluster, instances, port)
 
     def rebalance(self, resource, replicas, key=""):
         """rebalance a resource group"""
-        return functions.rebalance(self.host, self.cluster, resource,
+        return self.functions.rebalance(self.cluster, resource,
                                    replicas, key)
 
     def add_resource(self, resource, partitions, state_model_def, mode=""):
         """add resource to cluster"""
-        return functions.add_resource(self.host, self.cluster, resource,
+        return self.functions.add_resource(self.cluster, resource,
                                       partitions, state_model_def, mode)
 
     def enable_instance(self, instance, enabled=True):
@@ -245,7 +247,7 @@ class Cluster(object):
             ident = instance
         else:
             raise HelixException("Instance must be a string or participant")
-        return functions.enable_instance(self.host, self.cluster, ident,
+        return self.functions.enable_instance(self.cluster, ident,
                                          enabled)
 
     def disable_instance(self, instance):
@@ -272,7 +274,7 @@ class Cluster(object):
         else:
             raise HelixException("Partition must be a string or partition")
 
-        return functions.enable_partition(self.host, self.cluster, resource,
+        return self.functions.enable_partition(self.cluster, resource,
                                           part_id, ident, enabled)
 
     def disable_partition(self, resource, partition, instance):
@@ -291,7 +293,7 @@ class Cluster(object):
             raise HelixException(
                 "Resource must be a string or a resource group object")
 
-        return functions.enable_resource(self.host, self.cluster,
+        return self.functions.enable_resource(self.cluster,
                                          resource_name, enabled)
 
     def disable_resource(self, resource):
@@ -308,7 +310,7 @@ class Cluster(object):
         else:
             raise HelixException("Resource must be resource object or string")
 
-        return functions.add_resource_tag(self.host, self.cluster,
+        return self.functions.add_resource_tag(self.cluster,
                                           resource_name, tag)
 
     # del resource not yet available in api
@@ -322,7 +324,7 @@ class Cluster(object):
     #     else:
     #         raise HelixException("Resource must be resource object or str")
     #
-    #     return functions.del_resource_tag(self.host, self.cluster,
+    #     return self.functions.del_resource_tag(self.cluster,
     #                                       resource_name, tag)
 
     def add_instance_tag(self, instance, tag):
@@ -335,7 +337,7 @@ class Cluster(object):
         else:
             raise HelixException("Instance must be a string or participant")
 
-        return functions.add_instance_tag(self.host, self.cluster, ident, tag)
+        return self.functions.add_instance_tag(self.cluster, ident, tag)
 
     def del_instance_tag(self, instance, tag):
         ident = None
@@ -347,18 +349,37 @@ class Cluster(object):
         else:
             raise HelixException("Instance must be a string or participant")
 
-        return functions.del_instance_tag(self.host, self.cluster, ident, tag)
+        return self.functions.del_instance_tag(self.cluster, ident, tag)
 
     def del_instance(self, instance):
         """remove instance from cluster, assumes instance is a
         participant object"""
-        return functions.del_instance(self.host, self.cluster, instance.ident)
+        return self.functions.del_instance(self.cluster, instance.ident)
 
     def del_resource(self, resource):
         """remove resource group from cluster, assumes resource is a
         resource object"""
-        return functions.del_resource(self.host, self.cluster, resource.name)
+        return self.functions.del_resource(self.cluster, resource.name)
 
     def del_cluster(self):
         """remove cluster from helix"""
-        return functions.del_cluster(self.host, self.cluster)
+        return self.functions.del_cluster(self.cluster)
+
+class Cluster(BaseCluster):
+    def __init__(self, host, cluster):
+       super(Cluster, self).__init__(cluster)
+       self.host = host
+       self.functions = RestHelixFunctions(host)
+
+
+class ZKCluster(BaseCluster):
+    def __init__(self, zookeeper_connect_string, zookeeper_root, cluster):
+        super(ZKCluster, self).__init__(cluster)
+
+        # We want to fail if kazoo cannot be found, but only if using the zookeeper object.
+        if not zookeeper_ok:
+          raise ImportError
+
+        self.zookeeper_connect_string = zookeeper_connect_string
+        self.zookeeper_root = zookeeper_root
+        self.functions = ZookeeperHelixFunctions(self.zookeeper_connect_string, self.zookeeper_root)

http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/functions.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/functions.py b/contributors/py-helix-admin/helix/functions.py
index 5e36404..9221aca 100644
--- a/contributors/py-helix-admin/helix/functions.py
+++ b/contributors/py-helix-admin/helix/functions.py
@@ -25,538 +25,481 @@ from helixexceptions import HelixException
 from helixexceptions import HelixAlreadyExistsException
 from helixexceptions import HelixDoesNotExistException
 
-
-def _post_payload(host, path, data, **kwargs):
-    """generic function to handle posting data
-    :rtype : return body of page
-    :param host: host to send data to
-    :param path: path to interact with
-    :param data: data to send
-    :param kwargs:  additional keyword args
-    """
-
-    if "http://" not in host:
-        host = "http://{0}".format(host)
-
-    res = Resource(host)
-
-    payload = "jsonParameters={0}".format(json.dumps(data))
-    for key, value in kwargs.items():
-        payload += '&{0}={1}'.format(key, json.dumps(value))
-    headers = {"Content-Type": "application/json"}
-    # print "path is %s" % path
-    page = res.post(path=path, payload=payload, headers=headers)
-    body = page.body_string()
-    if body:
-        body = json.loads(body)
+class RestHelixFunctions:
+    def __init__(self, host):
+        if "http://" not in host:
+            self.host = "http://{0}".format(host)
+        else:
+            self.host = host
+
+    def _post_payload(self, path, data, **kwargs):
+        """generic function to handle posting data
+        :rtype : return body of page
+        :param path: path to interact with
+        :param data: data to send
+        :param kwargs:  additional keyword args
+        """
+
+        res = Resource(self.host)
+
+        payload = "jsonParameters={0}".format(json.dumps(data))
+        for key, value in kwargs.items():
+            payload += '&{0}={1}'.format(key, json.dumps(value))
+        headers = {"Content-Type": "application/json"}
+        # print "path is %s" % path
+        page = res.post(path=path, payload=payload, headers=headers)
+        body = page.body_string()
+        if body:
+            body = json.loads(body)
+
+            if isinstance(body, dict) and "ERROR" in body:
+                raise HelixException(body["ERROR"])
+
+        # test what was returned, see if any exceptions need to be raise
+        # if not body:
+        # raise HelixException("body for path {0} is empty".format(path))
+        # else:
+        # print "BODY IS EMPTY FOR ", path
+        # print "BODY is %s." % body
+
+        return body
+
+
+    def _get_page(self, path):
+        """if we're specifying a cluster then verify that a cluster is set"""
+
+        res = Resource(self.host)
+
+        page = res.get(path=path)
+        data = page.body_string()
+        body = None
+        try:
+            body = json.loads(data)
+        except ValueError:
+            body = json.loads(data[:-3])
+
+        # test what was returned, see if any exceptions need to be raise
+        if not body:
+            raise HelixException("body for path {0} is empty".format(path))
 
         if isinstance(body, dict) and "ERROR" in body:
             raise HelixException(body["ERROR"])
 
-    # test what was returned, see if any exceptions need to be raise
-    # if not body:
-    # raise HelixException("body for path {0} is empty".format(path))
-    # else:
-    # print "BODY IS EMPTY FOR ", path
-    # print "BODY is %s." % body
-
-    return body
-
-
-def _get_page(host, path):
-    """if we're specifying a cluster then verify that a cluster is set"""
-
-    if "http://" not in host:
-        host = "http://{0}".format(host)
-
-    res = Resource(host)
-
-    page = res.get(path=path)
-    data = page.body_string()
-    body = None
-    try:
-        body = json.loads(data)
-    except ValueError:
-        body = json.loads(data[:-3])
-
-    # test what was returned, see if any exceptions need to be raise
-    if not body:
-        raise HelixException("body for path {0} is empty".format(path))
-
-    if isinstance(body, dict) and "ERROR" in body:
-        raise HelixException(body["ERROR"])
-
-    return body
-
+        return body
 
-def _delete_page(host, path):
-    """delete page at a given path"""
-    retval = None
-    if "http://" not in host:
-        host = "http://{0}".format(host)
 
-    res = Resource(host)
+    def _delete_page(self, path):
+        """delete page at a given path"""
+        retval = None
 
-    page = res.delete(path)
-    data = page.body_string()
-    if data:
-        retval = json.loads(data)
+        res = Resource(self.host)
 
-    return retval
+        page = res.delete(path)
+        data = page.body_string()
+        if data:
+            retval = json.loads(data)
 
+        return retval
 
-def get_clusters(host):
-    """ querys helix cluster for all clusters """
-    return _get_page(host, "/clusters")["listFields"]["clusters"]
 
+    def get_clusters(self):
+        """ querys helix cluster for all clusters """
+        return self._get_page("/clusters")["listFields"]["clusters"]
 
-def get_resource_groups(host, cluster):
-    """ querys helix cluster for resources groups of the current cluster"""
-    return _get_page(host, "/clusters/{0}/resourceGroups".format(cluster))[
-        "listFields"]["ResourceGroups"]
 
+    def get_resource_groups(self, cluster):
+        """ querys helix cluster for resources groups of the current cluster"""
+        return self._get_page("/clusters/{0}/resourceGroups".format(cluster))[
+            "listFields"]["ResourceGroups"]
 
-def get_resource_tags(host, cluster):
-    """returns a dict of resource tags for a cluster"""
-    return _get_page(host, "/clusters/{0}/resourceGroups".format(cluster))[
-        "mapFields"]["ResourceTags"]
 
+    def get_resource_tags(self, cluster):
+        """returns a dict of resource tags for a cluster"""
+        return self._get_page("/clusters/{0}/resourceGroups".format(cluster))[
+            "mapFields"]["ResourceTags"]
 
-def get_resource_group(host, cluster, resource):
-    """ gets the ideal state of the specified resource group of the
-    current cluster"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixException(
-            "{0} is not a resource group of {1}".format(resource, cluster))
 
-    return _get_page(host, "/clusters/{0}/resourceGroups/{1}".format(cluster,
-                                                                     resource))
+    def get_resource_group(self, cluster, resource):
+        """ gets the ideal state of the specified resource group of the
+        current cluster"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
 
+        return self._get_page("/clusters/{0}/resourceGroups/{1}".format(cluster,
+            resource))
 
-def get_ideal_state(host, cluster, resource):
-    """ gets the ideal state of the specified resource group of the
-    current cluster"""
+    def get_ideal_state(self, cluster, resource):
+        """ gets the ideal state of the specified resource group of the
+        current cluster"""
 
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixException(
-            "{0} is not a resource group of {1}".format(resource, cluster))
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
 
-    return _get_page(host, "/clusters/{0}/resourceGroups/{1}/idealState".
-                     format(cluster, resource))["mapFields"]
+        return self._get_page("/clusters/{0}/resourceGroups/{1}/idealState".
+                         format(cluster, resource))["mapFields"]
 
+    def get_external_view(self, cluster, resource):
+        """return the external view for a given cluster and resource"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
 
-def get_external_view(host, cluster, resource):
-    """return the external view for a given cluster and resource"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixException(
-            "{0} is not a resource group of {1}".format(resource, cluster))
+        return self._get_page("/clusters/{0}/resourceGroups/{1}/externalView".format(
+                             cluster, resource))["mapFields"]
 
-    return _get_page(host,
-                     "/clusters/{0}/resourceGroups/{1}/externalView".format(
-                         cluster, resource))["mapFields"]
+    def get_instances(self, cluster):
+        """get list of instances registered to the cluster"""
+        if not cluster:
+            raise HelixException("Cluster must be set before "
+                                 "calling this function")
 
+        return self._get_page("/clusters/{0}/instances".format(cluster))[
+            "instanceInfo"]
 
-def get_instances(host, cluster):
-    """get list of instances registered to the cluster"""
-    if not cluster:
-        raise HelixException("Cluster must be set before "
-                             "calling this function")
+    def get_instance_detail(self, cluster, name):
+        """get details of an instance"""
+        return self._get_page("/clusters/{0}/instances/{1}".format(cluster, name))
 
-    return _get_page(host, "/clusters/{0}/instances".format(cluster))[
-        "instanceInfo"]
+    def get_config(self, cluster, config):
+        """get requested config"""
+        return self._get_page("/clusters/{0}/configs/{1}".format(cluster, config))
 
+    def add_cluster(self, cluster):
+        """add a cluster to helix"""
+        if cluster in self.get_clusters():
+            raise HelixAlreadyExistsException(
+                "Cluster {0} already exists".format(cluster))
 
-def get_instance_detail(host, cluster, name):
-    """get details of an instance"""
-    return _get_page(host, "/clusters/{0}/instances/{1}".format(cluster, name))
+        data = {"command": "addCluster",
+                "clusterName": cluster}
 
-
-def get_config(host, cluster, config):
-    """get requested config"""
-    return _get_page(host, "/clusters/{0}/configs/{1}".format(cluster, config))
-
-
-def add_cluster(host, cluster):
-    """add a cluster to helix"""
-    if cluster in get_clusters(host):
-        raise HelixAlreadyExistsException(
-            "Cluster {0} already exists".format(cluster))
-
-    data = {"command": "addCluster",
-            "clusterName": cluster}
-
-    page = _post_payload(host, "/clusters", data)
-    return page
-
-
-def add_instance(host, cluster, instances, port):
-    """add a list of instances to a cluster"""
-    if cluster not in get_clusters(host):
-        raise HelixDoesNotExistException(
-            "Cluster {0} does not exist".format(cluster))
-
-    if not isinstance(instances, list):
-        instances = [instances]
-    instances = ["{0}:{1}".format(instance, port) for instance in instances]
-    try:
-        newinstances = set(instances)
-        oldinstances = set(
-            [x["id"].replace('_', ':') for x in get_instances(host, cluster)])
-        instances = list(newinstances - oldinstances)
-    except HelixException:
-        # this will get thrown if instances is empty,
-        # which if we're just populating should happen
-        pass
-
-    if instances:
-        data = {"command": "addInstance",
-                "instanceNames": ";".join(instances)}
-
-        instance_path = "/clusters/{0}/instances".format(cluster)
-        # print "adding to", instance_path
-        page = _post_payload(host, instance_path, data)
+        page = self._post_payload("/clusters", data)
         return page
 
-    else:
-        raise HelixAlreadyExistsException(
-            "All instances given already exist in cluster")
-
-
-def rebalance(host, cluster, resource, replicas, key=""):
-    """rebalance the given resource group"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixException(
-            "{0} is not a resource group of {1}".format(resource, cluster))
-
-    data = {"command": "rebalance",
-            "replicas": replicas}
-
-    if key:
-        data["key"] = key
-    page = _post_payload(host,
-                         "/clusters/{0}/resourceGroups/{1}/idealState".format(
-                             cluster, resource), data)
-    return page
-
-
-def activate_cluster(host, cluster, grand_cluster, enabled=True):
-    """activate the cluster with the grand cluster"""
-    if grand_cluster not in get_clusters(host):
-        raise HelixException(
-            "grand cluster {0} does not exist".format(grand_cluster))
-
-    data = {'command': 'activateCluster',
-            'grandCluster': grand_cluster}
-
-    if enabled:
-        data["enabled"] = "true"
-    else:
-        data["enabled"] = "false"
-
-    page = _post_payload(host, "/clusters/{0}".format(cluster), data)
-    return page
-
-
-def deactivate_cluster(host, cluster, grand_cluster):
-    """deactivate the cluster with the grand cluster"""
-    return activate_cluster(host, cluster, grand_cluster, enabled=False)
-
-
-def add_resource(host, cluster, resource, partitions,
-                 state_model_def, mode=""):
-    """Add given resource group"""
-    if resource in get_resource_groups(host, cluster):
-        raise HelixAlreadyExistsException(
-            "ResourceGroup {0} already exists".format(resource))
-
-    data = {"command": "addResource",
-            "resourceGroupName": resource,
-            "partitions": partitions,
-            "stateModelDefRef": state_model_def}
-
-    if mode:
-        data["mode"] = mode
-
-    return _post_payload(host, "/clusters/{0}/resourceGroups".format(cluster),
-                         data)
-
-
-def enable_resource(host, cluster, resource, enabled=True):
-    """enable or disable specified resource"""
-    data = {"command": "enableResource"}
-    if enabled:
-        data["enabled"] = "true"
-    else:
-        data["enabled"] = "false"
-
-    return _post_payload(host, "/clusters/{0}/resourceGroups/{1}".format(
-        cluster, resource), data)
-
-
-def disable_resource(host, cluster, resource):
-    """function for disabling resources"""
-    return enable_resource(host, cluster, resource, enabled=False)
-
-
-def alter_ideal_state(host, cluster, resource, newstate):
-    """alter ideal state"""
-    data = {"command": "alterIdealState"}
-    return _post_payload(host,
-                         "/clusters/{0}/resourceGroups/{1}/idealState".format(
-                             cluster, resource), data,
-                         newIdealState=newstate)
-
-
-def enable_instance(host, cluster, instance, enabled=True):
-    """enable instance within cluster"""
-    data = {"command": "enableInstance"}
-    if enabled:
-        data["enabled"] = "true"
-    else:
-        data["enabled"] = "false"
-
-    return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
-                                                                    instance),
-                         data)
-
-
-def disable_instance(host, cluster, instance):
-    """wrapper for ease of use for disabling an instance"""
-    return enable_instance(host, cluster, instance, enabled=False)
-
-
-def swap_instance(host, cluster, old, new):
-    """swap instance"""
-    data = {"command": "swapInstance",
-            "oldInstance": old,
-            "newInstance": new}
-
-    return _post_payload(host, "/cluster/{0}/instances".format(cluster), data)
-
-
-def enable_partition(host, cluster, resource, partition, instance,
-                     enabled=True):
-    """enable Partition """
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixDoesNotExistException(
-            "ResourceGroup {0} does not exist".format(resource))
-
-    data = {"command": "enablePartition",
-            "resource": resource,
-            "partition": partition,
-            "enabled": enabled}
-    return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
-                                                                    instance),
-                         data)
-
-
-def disable_partition(host, cluster, resource, partitions, instance):
-    """disable Partition """
-    return enable_partition(host, cluster, resource, partitions, instance,
-                            enabled=False)
-
-
-def reset_partition(host, cluster, resource, partitions, instance):
-    """reset partition"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixDoesNotExistException(
-            "ResourceGroup {0} does not exist".format(resource))
-
-    data = {"command": "resetPartition",
-            "resource": resource,
-            "partition": " ".join(partitions)}
-    return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
-                                                                    instance),
-                         data)
-
-
-def reset_resource(host, cluster, resource):
-    """reset resource"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixDoesNotExistException(
-            "ResourceGroup {0} does not exist".format(resource))
-
-    data = {"command": "resetResource"}
-    return _post_payload(host,
-                         "/clusters/{0}/resourceGroups/{1}".format(cluster,
-                                                                   resource),
-                         data)
-
-
-def reset_instance(host, cluster, instance):
-    """reset instance"""
-    if instance not in get_instances(host, cluster):
-        raise HelixDoesNotExistException(
-            "Instance {0} does not exist".format(instance))
-
-    data = {"command": "resetInstance"}
-    return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
-                                                                    instance),
-                         data)
-
-
-def add_instance_tag(host, cluster, instance, tag):
-    """add tag to an instance"""
-    data = {"command": "addInstanceTag",
-            "instanceGroupTag": tag}
-    return _post_payload(host,
-                         "/clusters/{0}/instances/{1}".format(
-                             cluster, instance), data)
-
-
-def del_instance_tag(host, cluster, instance, tag):
-    """remove tag from instance"""
-    data = {"command": "removeInstanceTag",
-            "instanceGroupTag": tag}
-    return _post_payload(host,
-                         "/clusters/{0}/instances/{1}".format(
-                             cluster, instance), data)
-
-
-def add_resource_tag(host, cluster, resource, tag):
-    """add tag to resource group"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixDoesNotExistException(
-            "ResourceGroup {0} does not exist".format(resource))
-
-    data = {"command": "addResourceProperty",
-            "INSTANCE_GROUP_TAG": tag}
-    return _post_payload(host,
-                         "/clusters/{0}/resourceGroups/{1}/idealState".format(
-                             cluster, resource), data)
-
-
-"""
-del resource currently does not exist in helix api
-def del_resource_tag(host, cluster, resource, tag):
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixDoesNotExistException(
-            "ResourceGroup {0} does not exist".format(resource))
-
-    data = {"command": "removeResourceProperty",
-            "INSTANCE_GROUP_TAG": tag}
-    return _post_payload(host,
-                         "/clusters/{0}/resourceGroups/{1}/idealState".format(
-                             cluster, resource), data)
-"""
-
-
-def get_instance_taginfo(host, cluster):
-    return _get_page(host, "/clusters/{0}/instances".format(
-        cluster))["tagInfo"]
-
-
-def expand_cluster(host, cluster):
-    """expand cluster"""
-    data = {"command": "expandCluster"}
-
-    return _post_payload(host, "/clusters/{0}/".format(cluster), data)
-
-
-def expand_resource(host, cluster, resource):
-    """expand resource"""
-    data = {"command": "expandResource"}
-
-    return _post_payload(host,
-                         "/clusters/{0}/resourceGroup/{1}/idealState".format(
-                             cluster, resource), data)
-
-
-def add_resource_property(host, cluster, resource, properties):
-    """add resource property properties must be a dictionary of properties"""
-    properties["command"] = "addResourceProperty"
-
-    return _post_payload(host,
-                         "/clusters/{0}/resourceGroup/{1}/idealState".format(
-                             cluster, resource), properties)
-
-
-def _handle_config(host, cluster, configs, command, participant=None,
-                   resource=None):
-    """helper function to set or delete configs in helix"""
-    data = {"command": "{0}Config".format(command),
-            "configs": ",".join(
-                ["{0}={1}".format(x, y) for x, y in configs.items()])}
-
-    address = "/clusters/{0}/configs/".format(cluster)
-    if participant:
-        address += "participant/{0}".format(participant)
-    elif resource:
-        address += "resource/{0}".format(resource)
-    else:
-        address += "cluster"
-
-    return _post_payload(host, address, data)
-
-
-def set_config(host, cluster, configs, participant=None, resource=None):
-    """sets config in helix"""
-    return _handle_config(host, cluster, configs, "set", participant, resource)
-
-
-def remove_config(host, cluster, configs, participant=None, resource=None):
-    """sets config in helix"""
-    return _handle_config(host, "remove", cluster, configs, participant,
-                          resource)
-
-
-def get_zk_path(host, path):
-    """get zookeeper path"""
-    return _get_page(host, "zkPath/{0}".format(path))
-
-
-def del_zk_path(host, path):
-    """delete zookeeper path"""
-    return _delete_page(host, "zkPath/{0}".format(path))
-
-
-def get_zk_child(host, path):
-    """get zookeeper child"""
-    return _get_page(host, "zkChild/{0}".format(path))
-
-
-def del_zk_child(host, path):
-    """delete zookeeper child"""
-    return _delete_page(host, "zkChild/{0}".format(path))
-
-
-def add_state_model(host, cluster, newstate):
-    """add state model"""
-    data = {"command": "addStateModel"}
+    def add_instance(self, cluster, instances, port):
+        """add a list of instances to a cluster"""
+        if cluster not in self.get_clusters():
+            raise HelixDoesNotExistException(
+                "Cluster {0} does not exist".format(cluster))
+
+        if not isinstance(instances, list):
+            instances = [instances]
+        instances = ["{0}:{1}".format(instance, port) for instance in instances]
+        try:
+            newinstances = set(instances)
+            oldinstances = set(
+                [x["id"].replace('_', ':') for x in self.get_instances(cluster)])
+            instances = list(newinstances - oldinstances)
+        except HelixException:
+            # this will get thrown if instances is empty,
+            # which if we're just populating should happen
+            pass
+
+        if instances:
+            data = {"command": "addInstance",
+                    "instanceNames": ";".join(instances)}
+
+            instance_path = "/clusters/{0}/instances".format(cluster)
+            # print "adding to", instance_path
+            page = self._post_payload(instance_path, data)
+            return page
+
+        else:
+            raise HelixAlreadyExistsException(
+                "All instances given already exist in cluster")
+
+    def rebalance(self, cluster, resource, replicas, key=""):
+        """rebalance the given resource group"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
+
+        data = {"command": "rebalance",
+                "replicas": replicas}
+
+        if key:
+            data["key"] = key
+        page = self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format(
+                                 cluster, resource), data)
+        return page
 
-    return _post_payload(host, "/clusters/{0}/StateModelDefs".format(cluster),
-                         data, newStateModelDef=newstate)
+    def activate_cluster(self, cluster, grand_cluster, enabled=True):
+        """activate the cluster with the grand cluster"""
+        if grand_cluster not in self.get_clusters():
+            raise HelixException(
+                "grand cluster {0} does not exist".format(grand_cluster))
 
+        data = {'command': 'activateCluster',
+                'grandCluster': grand_cluster}
 
-def del_instance(host, cluster, instance):
-    """delete instance"""
-    if instance not in [x["id"] for x in get_instances(host, cluster)]:
-        raise HelixDoesNotExistException(
-            "Instance {0} does not exist.".format(instance))
+        if enabled:
+            data["enabled"] = "true"
+        else:
+            data["enabled"] = "false"
 
-    page = _delete_page(host,
-                        "/clusters/{0}/instances/{1}".format(cluster,
-                                                             instance))
-    return page
+        page = self._post_payload("/clusters/{0}".format(cluster), data)
+        return page
 
+    def deactivate_cluster(self, cluster, grand_cluster):
+        """deactivate the cluster with the grand cluster"""
+        return activate_cluster(cluster, grand_cluster, enabled=False)
+
+    def add_resource(self, cluster, resource, partitions, state_model_def, mode=""):
+        """Add given resource group"""
+        if resource in self.get_resource_groups(cluster):
+            raise HelixAlreadyExistsException(
+                "ResourceGroup {0} already exists".format(resource))
+
+        data = {"command": "addResource",
+                "resourceGroupName": resource,
+                "partitions": partitions,
+                "stateModelDefRef": state_model_def}
+
+        if mode:
+            data["mode"] = mode
+
+        return self._post_payload("/clusters/{0}/resourceGroups".format(cluster),
+                             data)
+
+    def enable_resource(self, cluster, resource, enabled=True):
+        """enable or disable specified resource"""
+        data = {"command": "enableResource"}
+        if enabled:
+            data["enabled"] = "true"
+        else:
+            data["enabled"] = "false"
+
+        return self._post_payload("/clusters/{0}/resourceGroups/{1}".format(
+            cluster, resource), data)
+
+    def disable_resource(self, cluster, resource):
+        """function for disabling resources"""
+        return enable_resource(cluster, resource, enabled=False)
+
+    def alter_ideal_state(self, cluster, resource, newstate):
+        """alter ideal state"""
+        data = {"command": "alterIdealState"}
+        return self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format(
+                                 cluster, resource), data,
+                             newIdealState=newstate)
+
+    def enable_instance(self, cluster, instance, enabled=True):
+        """enable instance within cluster"""
+        data = {"command": "enableInstance"}
+        if enabled:
+            data["enabled"] = "true"
+        else:
+            data["enabled"] = "false"
+
+        return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+                                                                        instance),
+                             data)
+
+    def disable_instance(self, cluster, instance):
+        """wrapper for ease of use for disabling an instance"""
+        return enable_instance(cluster, instance, enabled=False)
+
+    def swap_instance(self, cluster, old, new):
+        """swap instance"""
+        data = {"command": "swapInstance",
+                "oldInstance": old,
+                "newInstance": new}
+
+        return self._post_payload("/cluster/{0}/instances".format(cluster), data)
+
+    def enable_partition(self, cluster, resource, partition, instance,
+                         enabled=True):
+        """enable Partition """
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        data = {"command": "enablePartition",
+                "resource": resource,
+                "partition": partition,
+                "enabled": enabled}
+        return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+                                                                        instance),
+                             data)
+
+    def disable_partition(self, cluster, resource, partitions, instance):
+        """disable Partition """
+        return enable_partition(cluster, resource, partitions, instance,
+                                enabled=False)
+
+    def reset_partition(self, cluster, resource, partitions, instance):
+        """reset partition"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        data = {"command": "resetPartition",
+                "resource": resource,
+                "partition": " ".join(partitions)}
+        return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+                                                                        instance),
+                             data)
+
+    def reset_resource(self, cluster, resource):
+        """reset resource"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        data = {"command": "resetResource"}
+        return self._post_payload("/clusters/{0}/resourceGroups/{1}".format(cluster,
+                                                                       resource),
+                             data)
+
+    def reset_instance(self, cluster, instance):
+        """reset instance"""
+        if instance not in self.get_instances(cluster):
+            raise HelixDoesNotExistException(
+                "Instance {0} does not exist".format(instance))
+
+        data = {"command": "resetInstance"}
+        return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+                                                                        instance),
+                             data)
+
+    def add_instance_tag(self, cluster, instance, tag):
+        """add tag to an instance"""
+        data = {"command": "addInstanceTag",
+                "instanceGroupTag": tag}
+        return self._post_payload("/clusters/{0}/instances/{1}".format(
+                                 cluster, instance), data)
+
+    def del_instance_tag(self, cluster, instance, tag):
+        """remove tag from instance"""
+        data = {"command": "removeInstanceTag",
+                "instanceGroupTag": tag}
+        return self._post_payload("/clusters/{0}/instances/{1}".format(
+                                 cluster, instance), data)
+
+    def add_resource_tag(self, cluster, resource, tag):
+        """add tag to resource group"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        data = {"command": "addResourceProperty",
+                "INSTANCE_GROUP_TAG": tag}
+        return self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format(
+                                 cluster, resource), data)
 
-def del_resource(host, cluster, resource):
-    """delete specified resource from cluster"""
-    if resource not in get_resource_groups(host, cluster):
-        raise HelixDoesNotExistException(
-            "ResourceGroup {0} does not exist".format(resource))
+    """
+    del resource currently does not exist in helix api
+    def del_resource_tag(self, cluster, resource, tag):
+        if resource not in self.get_resource_groups(host, cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        data = {"command": "removeResourceProperty",
+                "INSTANCE_GROUP_TAG": tag}
+        return _post_payload(host,
+                             "/clusters/{0}/resourceGroups/{1}/idealState".format(
+                                 cluster, resource), data)
+    """
 
-    page = _delete_page(host, "/clusters/{0}/resourceGroups/{1}".format(
-        cluster, resource))
-    return page
+    def get_instance_taginfo(self, cluster):
+        return self._get_page("/clusters/{0}/instances".format(
+            cluster))["tagInfo"]
+
+    def expand_cluster(self, cluster):
+        """expand cluster"""
+        data = {"command": "expandCluster"}
+        return self._post_payload("/clusters/{0}/".format(cluster), data)
+
+    def expand_resource(self, cluster, resource):
+        """expand resource"""
+        data = {"command": "expandResource"}
+
+        return self._post_payload("/clusters/{0}/resourceGroup/{1}/idealState".format(
+                                 cluster, resource), data)
+
+    def add_resource_property(self, cluster, resource, properties):
+        """add resource property properties must be a dictionary of properties"""
+        properties["command"] = "addResourceProperty"
+
+        return self._post_payload("/clusters/{0}/resourceGroup/{1}/idealState".format(
+                                 cluster, resource), properties)
+
+    def _handle_config(self, cluster, configs, command, participant=None,
+                       resource=None):
+        """helper function to set or delete configs in helix"""
+        data = {"command": "{0}Config".format(command),
+                "configs": ",".join(
+                    ["{0}={1}".format(x, y) for x, y in configs.items()])}
+
+        address = "/clusters/{0}/configs/".format(cluster)
+        if participant:
+            address += "participant/{0}".format(participant)
+        elif resource:
+            address += "resource/{0}".format(resource)
+        else:
+            address += "cluster"
+
+        return self._post_payload(address, data)
+
+    def set_config(self, cluster, configs, participant=None, resource=None):
+        """sets config in helix"""
+        return self._handle_config(cluster, configs, "set", participant, resource)
+
+    def remove_config(self, cluster, configs, participant=None, resource=None):
+        """sets config in helix"""
+        return self._handle_config(host, "remove", cluster, configs, participant,
+                              resource)
+
+    def get_zk_path(self, path):
+        """get zookeeper path"""
+        return self._get_page("zkPath/{0}".format(path))
+
+    def del_zk_path(self, path):
+        """delete zookeeper path"""
+        return self._delete_page("zkPath/{0}".format(path))
+
+    def get_zk_child(self, path):
+        """get zookeeper child"""
+        return self._get_page("zkChild/{0}".format(path))
+
+    def del_zk_child(self, path):
+        """delete zookeeper child"""
+        return self._delete_page("zkChild/{0}".format(path))
+
+    def add_state_model(self, cluster, newstate):
+        """add state model"""
+        data = {"command": "addStateModel"}
+
+        return self._post_payload("/clusters/{0}/StateModelDefs".format(cluster),
+                             data, newStateModelDef=newstate)
+
+    def del_instance(self, cluster, instance):
+        """delete instance"""
+        if instance not in [x["id"] for x in self.get_instances(cluster)]:
+            raise HelixDoesNotExistException(
+                "Instance {0} does not exist.".format(instance))
+
+        page = self._delete_page("/clusters/{0}/instances/{1}".format(cluster,
+                                                                 instance))
+        return page
 
+    def del_resource(self, cluster, resource):
+        """delete specified resource from cluster"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
 
-def del_cluster(host, cluster):
-    """delete cluster"""
-    page = _delete_page(host, "/clusters/{0}".format(cluster))
+        page = self._delete_page("/clusters/{0}/resourceGroups/{1}".format(
+            cluster, resource))
+        return page
 
-    return page
+    def del_cluster(self, cluster):
+        """delete cluster"""
+        page = self._delete_page("/clusters/{0}".format(cluster))
 
+        return page
 
-def send_message(host, cluster, path, **kwargs):
-    pass
+    def send_message(self, cluster, path, **kwargs):
+        pass

http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/statemodeldefs.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/statemodeldefs.py b/contributors/py-helix-admin/helix/statemodeldefs.py
new file mode 100644
index 0000000..8446ae9
--- /dev/null
+++ b/contributors/py-helix-admin/helix/statemodeldefs.py
@@ -0,0 +1,39 @@
+from ordereddict import OrderedDict
+
+# These essentially come from the java classes defined here. It is cheesey and should probably come from a configuration file.
+# https://github.com/linkedin/helix/blob/master/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java
+
+LEADER_STANDBY_STATE_DEF = OrderedDict()
+LEADER_STANDBY_STATE_DEF["id"] = "LeaderStandby"
+MAP_FIELDS = OrderedDict()
+LEADER_STANDBY_STATE_DEF["mapFields"] = MAP_FIELDS
+MAP_FIELDS["DROPPED.meta"] = { "count" : "-1" }
+MAP_FIELDS["LEADER.meta"] = { "count" : "1" }
+LEADER_NEXT = OrderedDict()
+MAP_FIELDS["LEADER.next"] = LEADER_NEXT
+LEADER_NEXT["DROPPED"] = "STANDBY"
+LEADER_NEXT["STANDBY"] = "STANDBY"
+LEADER_NEXT["OFFLINE"] = "STANDBY"
+MAP_FIELDS["OFFLINE.meta"] = { "count" : "-1" }
+OFFLINE_NEXT = OrderedDict()
+MAP_FIELDS["OFFLINE.next"] = OFFLINE_NEXT
+OFFLINE_NEXT["LEADER"] = "STANDBY"
+OFFLINE_NEXT["DROPPED"] = "DROPPED"
+OFFLINE_NEXT["STANDBY"] = "STANDBY"
+MAP_FIELDS["STANDBY.meta"] = { "count" : "R" }
+STANDBY_NEXT = OrderedDict()
+MAP_FIELDS["STANDBY.next"] = STANDBY_NEXT
+STANDBY_NEXT["LEADER"] = "LEADER"
+STANDBY_NEXT["DROPPED"] = "OFFLINE"
+STANDBY_NEXT["OFFLINE"] = "OFFLINE"
+LIST_FIELDS = OrderedDict()
+LEADER_STANDBY_STATE_DEF["listFields"] = LIST_FIELDS
+LIST_FIELDS["STATE_PRIORITY_LIST"] = [ "LEADER", "STANDBY", "OFFLINE", "DROPPED" ]
+LIST_FIELDS["STATE_TRANSITION_PRIORITYLIST"] = [ "LEADER-STANDBY", "STANDBY-LEADER", "OFFLINE-STANDBY", "STANDBY-OFFLINE", "OFFLINE-DROPPED" ]
+SIMPLE_FIELDS = OrderedDict()
+LEADER_STANDBY_STATE_DEF["simpleFields"] = SIMPLE_FIELDS
+SIMPLE_FIELDS["INITIAL_STATE"] = "OFFLINE"
+
+STATE_DEF_MAP = {
+    "LeaderStandby": LEADER_STANDBY_STATE_DEF
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/test/test_helix.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/test/test_helix.py b/contributors/py-helix-admin/helix/test/test_helix.py
new file mode 100755
index 0000000..f466086
--- /dev/null
+++ b/contributors/py-helix-admin/helix/test/test_helix.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python2.6
+
+from helixexceptions import HelixException
+from helixexceptions import HelixAlreadyExistsException
+from helixexceptions import HelixDoesNotExistException
+
+from participant import Participant
+from partition import Partition
+from resourcegroup import ResourceGroup
+
+from functions import RestHelixFunctions
+from zkfunctions import ZookeeperHelixFunctions
+
+from cluster import ZKCluster
+from cluster import Cluster
+
+import pytest
+import random
+
+INSTANCE_ID = "fake_12345"
+INSTANCE_NAME = "fake"
+INSTANCE_PORT = 12345
+PARTITION_COUNT = 5
+REPLICA_COUNT = 1
+STATEMODELDEF = "LeaderStandby"
+REBALANCE_MODE = "FULL_AUTO"
+RESOURCE_NAME = "fake_resource"
+TAG_NAME = "fake_tag"
+
+CLUSTER_ID = "helix_{id}"
+ZOOKEEPER_ROOT = "/testing_helix"
+ZOOKEEPER_HOST = "localhost:2181"
+REST_HOST = "localhost:8100"
+
+class TestHelixAdmin(object):
+    #@pytest.mark.int
+    def test_zookeeper_cluster(self):
+        cluster = ZKCluster(ZOOKEEPER_HOST, ZOOKEEPER_ROOT, self._get_cluster_name())
+        self._cluster_actions(cluster)
+
+    #@pytest.mark.int
+    def test_rest_cluster(self):
+        cluster = Cluster(REST_HOST, self._get_cluster_name())
+        self._cluster_actions(cluster)
+
+    def _get_cluster_name(self):
+        return CLUSTER_ID.format(id=random.randint(1, 1000000))
+
+    def _cluster_actions(self, cluster):
+        cluster.add_cluster()
+        cluster.add_resource(RESOURCE_NAME, PARTITION_COUNT, STATEMODELDEF, mode=REBALANCE_MODE)
+        cluster.add_instance(INSTANCE_NAME, INSTANCE_PORT)
+        cluster.add_instance_tag(INSTANCE_ID, TAG_NAME)
+        cluster.add_resource_tag(RESOURCE_NAME, TAG_NAME)
+        cluster.rebalance(RESOURCE_NAME, REPLICA_COUNT)
+        participant = cluster.participants.get(INSTANCE_ID)
+        cluster.del_instance(participant)
+        resource = cluster.resources.get(RESOURCE_NAME)
+        cluster.del_resource(resource)
+        cluster.del_cluster()

http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/zkfunctions.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/zkfunctions.py b/contributors/py-helix-admin/helix/zkfunctions.py
new file mode 100644
index 0000000..aed2b74
--- /dev/null
+++ b/contributors/py-helix-admin/helix/zkfunctions.py
@@ -0,0 +1,522 @@
+"""library to handle helix commands"""
+# XXX: Explore using zookeeper transactions for some of the write operations into zookeeper.
+# Currently, it looks like ensure_path and create with the make_path argument are not supported in transactions so it isn't usable out of the box.
+import json
+from ordereddict import OrderedDict
+from kazoo.client import KazooClient
+
+from statemodeldefs import STATE_DEF_MAP
+from helixexceptions import HelixException
+from helixexceptions import HelixAlreadyExistsException
+from helixexceptions import HelixDoesNotExistException
+from kazoo.exceptions import NodeExistsError
+
+RESOURCE_MODES = ["FULL_AUTO", "CUSTOMIZED", "SEMI_AUTO", "USER_DEFINED"]
+
+IDEAL_STATE_PATH = "/{clusterName}/IDEALSTATES"
+RESOURCE_IDEAL_STATE_PATH = "/{clusterName}/IDEALSTATES/{resourceName}"
+EXTERNAL_VIEW_STATE_PATH = "/{clusterName}/EXTERNALVIEW/{resourceName}"
+INSTANCE_PATH = "/{clusterName}/INSTANCES"
+PARTICIPANT_CONFIG_PATH = "/{clusterName}/CONFIGS/PARTICIPANT/{instanceName}"
+PARTICIPANTS_CONFIG_PATH = "/{clusterName}/CONFIGS/PARTICIPANT"
+CLUSTER_CONFIG_PATH = "/{clusterName}/CONFIGS/CLUSTER/{clusterName}"
+CONFIG_PATH = "/{clusterName}/CONFIGS/{configName}/{entityName}"
+STATE_MODEL_DEF_PATH = "/{clusterName}/STATEMODELDEFS/{stateModelName}"
+LIVE_INSTANCE_PATH = "/{clusterName}/LIVEINSTANCES/{instanceName}"
+
+HELIX_ZOOKEEPER_PATHS = {
+    "cluster": [
+        "/{clusterName}/CONFIGS",
+        "/{clusterName}/CONFIGS/RESOURCE",
+        "/{clusterName}/CONFIGS/CLUSTER",
+        PARTICIPANTS_CONFIG_PATH,
+        "/{clusterName}/LIVEINSTANCES",
+        INSTANCE_PATH,
+        IDEAL_STATE_PATH,
+        #"/{clusterName}/RESOURCEASSIGNMENTS",
+        "/{clusterName}/EXTERNALVIEW",
+        "/{clusterName}/STATEMODELDEFS",
+        "/{clusterName}/CONTROLLER",
+        "/{clusterName}/CONTROLLER/HISTORY",
+        "/{clusterName}/CONTROLLER/ERRORS",
+        "/{clusterName}/CONTROLLER/MESSAGES",
+        "/{clusterName}/CONTROLLER/STATUSUPDATES",
+        "/{clusterName}/PROPERTYSTORE",
+    ],
+    "resource": [
+        RESOURCE_IDEAL_STATE_PATH,
+        "/{clusterName}/RESOURCEASSIGNMENTS/{resourceName}",
+        EXTERNAL_VIEW_STATE_PATH
+    ],
+    "instance": [
+        #"/{clusterName}/LIVEINSTANCES/{instanceName}",
+        "/{clusterName}/INSTANCES/{instanceName}",
+        "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES",
+        "/{clusterName}/INSTANCES/{instanceName}/ERRORS",
+        "/{clusterName}/INSTANCES/{instanceName}/STATUSUPDATES",
+        "/{clusterName}/INSTANCES/{instanceName}/MESSAGES"
+    ],
+    "statemodel": [
+        STATE_MODEL_DEF_PATH
+    ]
+}
+
+CLUSTER_CONFIG_TEMPLATE = OrderedDict()
+CLUSTER_CONFIG_TEMPLATE["id"] = "{clusterName}"
+CLUSTER_CONFIG_TEMPLATE["mapFields"] = {}
+CLUSTER_CONFIG_TEMPLATE["listFields"] = {}
+CLUSTER_CONFIG_TEMPLATE["simpleFields"] = {"allowParticipantAutoJoin": "true"}
+
+
+class ZookeeperHelixFunctions(object):
+    """Zookeeper based client to manage helix clusters"""
+    def __init__(self, zookeeper_connect_string, zk_root):
+        """Constructor."""
+        self.zk = KazooClient(hosts=zookeeper_connect_string)
+        self.zk.start()
+        self.zk_root = zk_root
+
+    def _list_path(self, path):
+        """List a zookeeper path."""
+        return self.zk.get_children(path)
+
+    def _is_valid_cluster(self, cluster):
+        """Validate cluster configuration."""
+        for path in HELIX_ZOOKEEPER_PATHS.get("cluster"):
+            full_path = self._build_path(path.format(clusterName=cluster))
+            if not self.zk.exists(full_path):
+                return False
+        return True
+
+    def _build_path(self, path):
+        """Construct zookeeper path."""
+        return "".join([self.zk_root, path])
+
+    @classmethod
+    def _build_instance_entry(cls, instance, enabled="true"):
+        """Create the data entry for an instance."""
+        host, port = instance.split(":")
+        instance_data = OrderedDict()
+        instance_data["id"] = "{host}_{port}".format(host=host, port=port)
+        instance_data["listFields"] = {}
+        instance_data["mapFields"] = {}
+        instance_data["simpleFields"] = OrderedDict()
+        instance_data["simpleFields"]["HELIX_ENABLED"] = enabled
+        instance_data["simpleFields"]["HELIX_HOST"] = host
+        instance_data["simpleFields"]["HELIX_PORT"] = port
+        return instance_data
+
+    def create_root(self):
+        """Initialize zookeeper root"""
+        path = self._build_path("")
+        if not self.zk.exists(path):
+            self.zk.create(path)
+        return True
+
+    def get_clusters(self):
+        """ querys helix cluster for all clusters """
+        if self.zk.exists(self.zk_root):
+            return [ cluster for cluster in self._list_path(self.zk_root) if self._is_valid_cluster(cluster) ]
+        else:
+            return []
+
+    def get_resource_groups(self, cluster):
+        """ querys helix cluster for resources groups of the current cluster"""
+        return self._list_path(self._build_path(IDEAL_STATE_PATH.format(clusterName=cluster)))
+
+    def get_resource_tags(self, cluster):
+        """returns a dict of resource tags for a cluster"""
+        resource_tags = {}
+        for resource in self.get_resource_groups(cluster):
+            resource_data, resource_meta = self._get_resource_group(cluster, resource)
+            tag = resource_data.get("INSTANCE_GROUP_TAG")
+            if tag:
+                resource_tags[tag] = [resource]
+
+        return resource_tags
+
+    def _get_resource_group(self, cluster, resource):
+        """ gets the ideal state of the specified resource group of the
+        current cluster"""
+
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{resource} is not a resource group of {cluster}".format(resource=resource, cluster=cluster))
+
+        data, stat = self.zk.get(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)))
+        return (json.loads(data), stat)
+
+    def get_resource_group(self, cluster, resource):
+        """ COMPAT: gets the ideal state of the specified resource group of the
+        current cluster"""
+
+        return self._get_resource_group(cluster, resource)[0]
+
+    def _get_ideal_state(self, cluster, resource):
+        """ gets the ideal state of the specified resource group of the
+        current cluster"""
+
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
+
+        return self._get_resource_group(cluster, resource)["mapFields"]
+
+    def get_ideal_state(self, cluster, resource):
+        """ COMPAT: gets the ideal state of the specified resource group of the
+        current cluster"""
+
+        return self._get_ideal_state(cluster, resource)[0]
+
+    def _get_external_view(self, cluster, resource):
+        """return the external view for a given cluster and resource"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
+        data, stat = self.zk.get(self._build_path(EXTERNAL_VIEW_STATE_PATH.format(clusterName=cluster, resourceName=resource)))
+        return (json.loads(data)["mapFields"], stat)
+
+    def get_external_view(self, cluster, resource):
+        """ COMPAT: return the external view for a given cluster and resource"""
+        return self._get_external_view(cluster, resource)[0]
+
+    def get_instances(self, cluster):
+        """get list of instances registered to the cluster"""
+        if not cluster:
+            raise HelixException("Cluster must be set before "
+                                 "calling this function")
+
+        instances = []
+        for instance in self._list_path(self._build_path(PARTICIPANTS_CONFIG_PATH.format(clusterName=cluster))):
+            instance_data = json.loads(self.zk.get(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)))[0])
+            if self.zk.exists(self._build_path(LIVE_INSTANCE_PATH.format(clusterName=cluster, instanceName=instance))):
+                instance_data["simpleFields"]["Alive"] = "true"
+            else:
+                instance_data["simpleFields"]["Alive"] = "false"
+            instances.append(instance_data)
+        return instances
+
+    def _get_instance_detail(self, cluster, instance):
+        """get details of an instance"""
+        data, stat = self.zk.get(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)))
+        return (json.loads(data), stat)
+
+    def get_instance_detail(self, cluster, instance):
+        """ COMPAT: get details of an instance"""
+        return self._get_instance_detail(cluster, instance)[0]
+
+    def _get_config(self, cluster, config, entity):
+        """get requested config"""
+        data, stat = self.zk.get(self._build_path(CONFIG_PATH.format(clusterName=cluster, configName=config, entityName=entity)))
+        return (json.loads(data), stat)
+
+    def get_config(self, cluster, config, entity):
+        """ COMPAT: get requested config"""
+        return self._get_config(cluster, config, entity)[0]
+
+    def add_cluster(self, cluster):
+        """add a cluster to helix"""
+        if cluster in self.get_clusters():
+            raise HelixAlreadyExistsException(
+                "Cluster {0} already exists".format(cluster))
+
+        for path in HELIX_ZOOKEEPER_PATHS.get("cluster"):
+            self.zk.ensure_path(self._build_path(path.format(clusterName=cluster)))
+
+        data = CLUSTER_CONFIG_TEMPLATE
+        data["id"] = cluster
+
+        try:
+            self.zk.create(self._build_path(CLUSTER_CONFIG_PATH.format(clusterName=cluster)), json.dumps(data))
+        except NodeExistsError:
+            # Ignore existing cluster
+            pass
+
+        # Insert state defs if they don't exist
+        for state_def in STATE_DEF_MAP:
+            if not self.zk.exists(self._build_path(STATE_MODEL_DEF_PATH.format(clusterName=cluster, stateModelName=state_def))):
+                self.zk.create(self._build_path(STATE_MODEL_DEF_PATH.format(clusterName=cluster, stateModelName=state_def)), json.dumps(STATE_DEF_MAP[state_def]))
+
+        return True
+
+    def add_instance(self, cluster, instances, port):
+        """add a list of instances to a cluster"""
+        if cluster not in self.get_clusters():
+            raise HelixDoesNotExistException(
+                "Cluster {0} does not exist".format(cluster))
+
+        if not isinstance(instances, list):
+            instances = [instances]
+        instances = ["{instance}:{port}".format(instance=instance, port=port) for instance in instances]
+        try:
+            newinstances = set(instances)
+            oldinstances = set(
+                [x["id"].replace('_', ':') for x in self.get_instances(cluster)])
+            instances = list(newinstances - oldinstances)
+        except HelixException:
+            # this will get thrown if instances is empty,
+            # which if we're just populating should happen
+            pass
+
+        if instances:
+            for instance in instances:
+                data = self._build_instance_entry(instance)
+                self.zk.create(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance.replace(':', '_'))), json.dumps(data))
+                for path in HELIX_ZOOKEEPER_PATHS.get("instance"):
+                    self.zk.ensure_path(self._build_path(path.format(clusterName=cluster, instanceName=instance.replace(':', '_'))))
+            return True
+        else:
+            raise HelixAlreadyExistsException(
+                "All instances given already exist in cluster")
+
+
+    def rebalance(self, cluster, resource, replicas, key=""):
+        """rebalance the given resource group"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixException(
+                "{0} is not a resource group of {1}".format(resource, cluster))
+
+        # TODO: key usage is currently not supported.
+        if not key == "":
+            raise NotImplementedError
+
+        resource_data, resource_meta = self._get_resource_group(cluster, resource)
+        resource_data["simpleFields"]["REPLICAS"] = replicas
+        self.zk.set(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(resource_data))
+
+        return True
+
+    def activate_cluster(self, cluster, grand_cluster, enabled=True):
+        """activate the cluster with the grand cluster"""
+        if grand_cluster not in self.get_clusters():
+            raise HelixException(
+                "grand cluster {0} does not exist".format(grand_cluster))
+
+        raise NotImplementedError
+
+    def deactivate_cluster(self, cluster, grand_cluster):
+        """deactivate the cluster with the grand cluster"""
+        return self.activate_cluster(cluster, grand_cluster, enabled=False)
+
+
+    def add_resource(self, cluster, resource, partitions,
+                     state_model_def, mode="", state_model_factory_name="DEFAULT"):
+        """Add given resource group"""
+        if resource in self.get_resource_groups(cluster):
+            raise HelixAlreadyExistsException(
+                "ResourceGroup {0} already exists".format(resource))
+
+        data = {"id": resource,
+                "mapFields": {},
+                "listFields": {},
+                "simpleFields": {
+                    "IDEAL_STATE_MODE": "AUTO",
+                    "NUM_PARTITIONS": partitions,
+                    "REBALANCE_MODE": mode,
+                    "REPLICAS": "0",
+                    "STATE_MODEL_DEF_REF": state_model_def,
+                    "STATE_MODEL_FACTORY_NAME": state_model_factory_name
+                    }
+               }
+
+        if mode:
+            if mode in RESOURCE_MODES:
+                data["mode"] = mode
+            else:
+                raise ValueError("Invalid mode ({mode})".format(mode=mode))
+
+        self.zk.create(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(data))
+        return True
+
+    def enable_resource(self, cluster, resource, enabled=True):
+        """enable or disable specified resource"""
+        raise NotImplementedError
+
+    def disable_resource(self, cluster, resource):
+        """function for disabling resources"""
+        return self.enable_resource(cluster, resource, enabled=False)
+
+    def alter_ideal_state(self, cluster, resource, newstate):
+        """alter ideal state"""
+        raise NotImplementedError
+
+    def enable_instance(self, cluster, instance, enabled=True):
+        """enable instance within cluster"""
+        raise NotImplementedError
+
+    def disable_instance(self, cluster, instance):
+        """wrapper for ease of use for disabling an instance"""
+        return self.enable_instance(cluster, instance, enabled=False)
+
+    def swap_instance(self, cluster, old, new):
+        """swap instance"""
+        raise NotImplementedError
+
+    def enable_partition(self, cluster, resource, partition, instance,
+                       enabled=True):
+        """enable Partition """
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+        raise NotImplementedError
+
+    def disable_partition(self, cluster, resource, partitions, instance):
+        """disable Partition """
+        return self.enable_partition(cluster, resource, partitions, instance,
+                                     enabled=False)
+
+    def reset_partition(self, cluster, resource, partitions, instance):
+        """reset partition"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        raise NotImplementedError
+
+    def reset_resource(self, cluster, resource):
+        """reset resource"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        raise NotImplementedError
+
+    def reset_instance(self, cluster, instance):
+        """reset instance"""
+        if instance not in self.get_instances(cluster):
+            raise HelixDoesNotExistException(
+                "Instance {0} does not exist".format(instance))
+
+        raise NotImplementedError
+
+    def add_instance_tag(self, cluster, instance, tag):
+        """add tag to an instance"""
+        instance_data, instance_meta = self._get_instance_detail(cluster, instance)
+        instance_tags = instance_data.get("listFields").get("TAG_LIST", [])
+        if tag in instance_tags:
+            raise HelixAlreadyExistsException(
+                "Tag ({tag}) already exists for instance ({instance}).".format(tag=tag, instance=instance))
+
+        instance_tags.append(tag)
+        instance_data["listFields"]["TAG_LIST"] = instance_tags
+
+        # XXX: Apply some retry logic here
+        self.zk.set(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)), json.dumps(instance_data), version=instance_meta.version)
+        return True
+
+    def del_instance_tag(self, cluster, instance, tag):
+        """remove tag from instance"""
+        if instance not in [x["id"] for x in self.get_instances(cluster)]:
+            raise HelixDoesNotExistException(
+                "Instance {0} does not exist.".format(instance))
+
+    def add_resource_tag(self, cluster, resource, tag):
+        """add tag to resource group"""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        resource_data, resource_stat = self._get_resource_group(cluster, resource)
+        resource_data["simpleFields"]["INSTANCE_GROUP_TAG"] = tag
+
+        self.zk.set(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(resource_data), version=resource_stat.version)
+        return True
+
+    def del_resource_tag(self, cluster, resource, tag):
+        """Delete resource tag."""
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+        raise NotImplementedError
+
+    def get_instance_taginfo(self, cluster):
+        """Get resource tag info."""
+        instance_tags = {}
+        for instance in self.get_instances(cluster):
+            list_fields = instance.get("listFields")
+            if "TAG_LIST" in list_fields:
+                for tag in list_fields.get("TAG_LIST"):
+                    if tag in instance_tags:
+                        instance_tags[tag].append(instance.get("id"))
+                    else:
+                        instance_tags[tag] = [instance.get("id")]
+        return instance_tags
+
+    def expand_cluster(self, cluster):
+        """expand cluster"""
+        raise NotImplementedError
+
+    def expand_resource(self, cluster, resource):
+        """expand resource"""
+        raise NotImplementedError
+
+    def add_resource_property(self, cluster, resource, properties):
+        """Add resource property. Properties must be a dictionary of properties."""
+        raise NotImplementedError
+
+    def set_config(self, cluster, configs, participant=None, resource=None):
+        """sets config in helix"""
+        raise NotImplementedError
+
+    def remove_config(self, cluster, configs, participant=None, resource=None):
+        """sets config in helix"""
+        raise NotImplementedError
+
+    def get_zk_path(self, path):
+        """get zookeeper path"""
+        return self.zk.get(self._build_path(path))
+
+    def del_zk_path(self, path):
+        """delete zookeeper path"""
+        return self.zk.delete(self._build_path(path))
+
+    def add_state_model(self, cluster, newstate):
+        """add state model"""
+        raise NotImplementedError
+
+    def del_instance(self, cluster, instance):
+        """delete instance"""
+        if cluster not in self.get_clusters():
+            raise HelixDoesNotExistException(
+                "Cluster {0} does not exist.".format(cluster))
+
+        if instance not in [x["id"] for x in self.get_instances(cluster)]:
+            raise HelixDoesNotExistException(
+                "Instance {0} does not exist.".format(instance))
+
+        self.zk.delete(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance.replace(':', '_'))))
+
+        # Reverse zookeeper structure for destruction.
+        for path in HELIX_ZOOKEEPER_PATHS.get("instance")[::-1]:
+            self.zk.delete(self._build_path(path.format(clusterName=cluster, instanceName=instance.replace(':', '_'))))
+        return True
+
+    def del_resource(self, cluster, resource):
+        """delete specified resource from cluster"""
+        if cluster not in self.get_clusters():
+            raise HelixDoesNotExistException(
+                "Cluster {0} does not exist.".format(cluster))
+
+        if resource not in self.get_resource_groups(cluster):
+            raise HelixDoesNotExistException(
+                "ResourceGroup {0} does not exist".format(resource))
+
+        self.zk.delete(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)))
+        return True
+
+    def del_cluster(self, cluster):
+        """delete cluster"""
+        if cluster not in self.get_clusters():
+            raise HelixDoesNotExistException(
+                "Cluster {0} does not exist.".format(cluster))
+
+        self.zk.delete(self._build_path(CLUSTER_CONFIG_PATH.format(clusterName=cluster)))
+
+        for path in HELIX_ZOOKEEPER_PATHS.get("cluster")[::-1]:
+            self.zk.ensure_path(self._build_path(path.format(clusterName=cluster)))
+
+        return True
+
+    def send_message(self, cluster, path, **kwargs):
+        """Send helix IPC message."""
+        raise NotImplementedError


Mime
View raw message