Added: helix/site-content/1.0.1-docs/project-reports.html URL: http://svn.apache.org/viewvc/helix/site-content/1.0.1-docs/project-reports.html?rev=1879414&view=auto ============================================================================== --- helix/site-content/1.0.1-docs/project-reports.html (added) +++ helix/site-content/1.0.1-docs/project-reports.html Wed Jul 1 21:38:35 2020 @@ -0,0 +1,246 @@ + + + + + + + + Apache Helix - Generated Reports + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+ +
+ +
+
+ +
+
+
+
+
+ +

This document provides an overview of the various reports that are automatically generated by Maven . Each report is briefly described below.

+
+

Overview

+ + + + + + + + + + + + + +
DocumentDescription
SonarQuality analysis dashboard.
+
+
+
+
+
+
+ +
+ + + + +
+
+
+

Back to top

+ +

Reflow Maven skin by Andrius Velykis.

+ +
+
Apache Helix, Apache, the Apache feather logo, and the Apache Helix project logos are trademarks of The Apache Software Foundation. + All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ Privacy Policy +
+
+
+ + + + + + + + + + + + + + + + + + \ No newline at end of file Added: helix/site-content/1.0.1-docs/project-summary.html URL: http://svn.apache.org/viewvc/helix/site-content/1.0.1-docs/project-summary.html?rev=1879414&view=auto ============================================================================== --- helix/site-content/1.0.1-docs/project-summary.html (added) +++ helix/site-content/1.0.1-docs/project-summary.html Wed Jul 1 21:38:35 2020 @@ -0,0 +1,311 @@ + + + + + + + + Apache Helix - Project Summary + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+ +
+ +
+
+ +
+
+
+
+
+ + +
+

Project Information

+ + + + + + + + + + + + + + + + + + + + + + +
FieldValue
NameApache Helix :: Website :: 1.0.1
DescriptionHelix is a generic cluster management framework used for the automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
Homepagehttp://helix.apache.org/1.0.1-docs
+
+
+

Project Organization

+ + + + + + + + + + + + + + + + + + +
FieldValue
NameThe Apache Software Foundation
URLhttp://www.apache.org/
+
+
+

Build Information

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FieldValue
GroupIdorg.apache.helix
ArtifactId1.0.1-docs
Version1.0.1-SNAPSHOT
Typebundle
Java Version1.8
+
+
+
+
+
+
+ +
+ + + + +
+
+
+

Back to top

+ +

Reflow Maven skin by Andrius Velykis.

+ +
+
Apache Helix, Apache, the Apache feather logo, and the Apache Helix project logos are trademarks of The Apache Software Foundation. + All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ Privacy Policy +
+
+
+ + + + + + + + + + + + + + + + + + \ No newline at end of file Added: helix/site-content/1.0.1-docs/quota_scheduling.html URL: http://svn.apache.org/viewvc/helix/site-content/1.0.1-docs/quota_scheduling.html?rev=1879414&view=auto ============================================================================== --- helix/site-content/1.0.1-docs/quota_scheduling.html (added) +++ helix/site-content/1.0.1-docs/quota_scheduling.html Wed Jul 1 21:38:35 2020 @@ -0,0 +1,425 @@ + + + + + + + + Apache Helix - Quota-based Task Scheduling + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+ +
+ +
+
+ +
+
+
+
+ + +
+

Introduction

+

Intro

+

Quota-based task scheduling is a feature addition to Helix Task Framework that enables users of Task Framework to apply the notion of categories in distributed task management.

+
+
+

Purpose

+

As Helix Task Framework gains usage in other open-source frameworks such as Apache Gobblin and Apache Pinot, it has also seen an increase in the variety in the types of distributed tasks it was managing. There have also been explicit feature requests to Helix for differentiating different types of tasks by creating corresponding quotas.

+

Quota-based task scheduling aims to fulfill these requests by allowing users to define a quota profile consisting of quota types and their corresponding quotas. The goal of this feature is threefold: 1) the user will have the ability to prioritize one type of workflows/jobs/tasks over another and 2) achieve isolation among the type of tasks and 3) make monitoring easier by tracking the status of distributed execution by type.

+
+
+

Glossary and Definitions

+
    +
  • Task Framework: a component of Apache Helix. A framework on which users can define and run workflows, jobs, and tasks in a distributed way.
  • +
  • Workflow: the largest unit of work in Task Framework. A workflow consists of one or more jobs. There are two types of workflows: +
      +
    • Generic workflow: a generic workflow is a workflow consisting of jobs (a job DAG) that are used for general purposes. A generic workflow may be removed if expired or timed out.
    • +
    • Job queue: a job queue is a special type of workflow consisting of jobs that tend to have a linear dependency (this dependency is configurable, however). There is no expiration for job queues - it lives on until it is deleted.
    • +
  • +
  • Job: the second largest unit of work in Task Framework. A job consists of one or more mutually independent tasks. There are two types of jobs: +
      +
    • Generic job: a generic job is a job consisting of one or more tasks.
    • +
    • Targeted job: a targeted job differs from generic jobs in that these jobs must have a target resource, and the tasks belonging to such jobs will be scheduled alongside the partitions of the target resource. To illustrate, an Espresso user of Task Framework may wish to schedule a backup job on one of their DBs called MemberDataDB. This DB will be divided into multiple partitions (_MemberDataDB_1, _MemberDataDB2, … MemberDataDBN), and suppose that a targeted job is submitted such that its tasks will be paired up with each of those partitions. This “pairing-up” is necessary because this task is a backup task that needs to be on the same physical machine as those partitions the task is backing up.
    • +
  • +
  • Task: the smallest unit of work in Task Framework. A task is an independent unit of work.
  • +
  • Quota resource type: denotes a particular type of resource. Examples would be JVM thread count, memory, CPU resources, etc.. Generally, each task that runs on a Helix Participant (= instance, worker, node) occupies a set amount of resources. Note that only JVM thread count is the only quota resource type currently supported by Task Framework, with each task occupying 1 thread out of 40 threads available per Helix Participant (instance).
  • +
  • Quota type: denotes which category a given job and its underlying tasks should be classified as. For example, you may define a quota configuration with two quota types, type “Backup”, and type “Aggregation” and a default type “DEFAULT”. You may prioritize the backup type by giving it a higher quota ratio - such as 20:10:10, respectively. When there are streams of jobs being submitted, you can expect each Participant, assuming that it has a total of 40 JVM threads, will have 20 “Backup” tasks, 10 “Aggregation” tasks, and 10 “DEFAULT” tasks. Quota types are defined and applied at the job level, meaning all tasks belonging to a particular job with a quota type will be of that quota type. Note that if a quota type is set for a workflow, then all jobs belonging to that workflow will inherit the type from the workflow.
  • +
  • Quota: a number referring to a relative ratio that determines what portion of given resources should be allotted to a particular quota type. +
      +
    • E.g.) TYPE_0: 40, TYPE_1: 20, …, DEFAULT: 40
    • +
  • +
  • Quota config: a set of string-integer mappings that indicate the quota resource type, quota types, and corresponding quotas. Task Framework stores the quota config in ClusterConfig.
  • +
+
+
+

Architecture

+
+

AssignableInstance

+

AssignableInstance is an abstraction that represents each live Participant that is able to take on tasks from the Controller. Each AssignableInstance will cache what tasks it has running as well as remaining task counts from the quota-based capacity calculation.

+
+
+

AssignableInstanceManager

+

AssignableInstanceManager manages all AssignableInstances. It also serves as a connecting layer between the Controller and each AssignableInstance. AssignableInstanceManager also provides a set of interfaces that allows the Controller to easily determine whether an AssignableInstance is able to take on more tasks.

+
+
+

TaskAssigner

+

The TaskAssigner interface provides basic API methods that involve assignments of tasks based on quota constraints. Currently, Task Framework only concerns the number of Participant-side JVM threads, each of which corresponds to an active task.

+
+
+

RuntimeJobDag (JobDagIterator)

+

This new component serves as an iterator for JobDAGs for the Controller. Previously, task assignment required the Controller to iterate through all jobs and their underlying tasks to determine whether there were any tasks that needed to be assigned and scheduled. This proved to be inefficient and did not scale with the increasing load we were putting on Task Framework. Each RuntimeJobDag records states, that is, it knows what task needs to be offered up to the Controller for scheduling. This saves the redundant computation for the Controller every time it goes through the TaskSchedulingStage of the Task pipeline.

+

Architecture

+
+
+
+

User Manual

+
+

How it works

+

Quota-based task scheduling works as follows. If a quota type is set, Task Framework will calculate a ratio against the sum of all quota config numbers for each quota type. Then it will apply that ratio to find the actual resource amount allotted to each quota type. Here is an example to illustrate this: Suppose the quota config is as follows:

+
+
"QUOTA_TYPES":{
+  "A":"2"
+  ,"B":"1"
+  ,"DEFAULT":"1"
+}
+
+
+

Based on these raw numbers, Task Framework will compute the ratios. With the ratios, Task Framework will apply them to find the actual resource amount per quota type. The following table summarizes these calculations with the assumption of 40 JVM threads per instance:

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Quota Type Quota Config Ratio Actual Resource Allotted (# of JVM Threads)
A 2 50% 20
B 1 25% 10
DEFAULT 1 25% 10
+

Every instance (node) will have a quota profile that looks like this. This has a few implications. First, this allows for prioritization of certain jobs by allotting a greater amount of resources to corresponding quota types. In that sense, you may treat quota config numbers/ratios as user-defined priority values. More specifically, take the quota profile in the example above. In this case, when there are 100 jobs submitted for each quota type, jobs of type A will finish faster; in other words, quota type A will see twice as much throughput when there is a continuous stream of jobs due to its quota ratio being twice that of other quota types.

+

Quota-based task scheduling also allows for isolation/compartmentalization in scheduling jobs. Suppose there are two categories of jobs, with the first category being urgent jobs that are short-lived but need to be run right away. On the other hand, suppose that the second category of jobs tend to take longer, but they aren’t as urgent and can take their time running. Previously, these two types of jobs will get assigned, scheduled, and run in a mix, and it was indeed difficult to ensure that jobs in the first category be processed in an urgent manner. Quota-based scheduling solves this problem by allowing the user to create quota types that model “categories” with different characteristics and requirements.

+
+
+

How to use

+
    +
  • Setting a Quota Config in ClusterConfig
  • +
+

In order to use quota-based task scheduling, you must establish a quota config first. This is a one-time operation, and once you verified that your ClusterConfig has a quota config set, there is no need to set it again. See the following code snippet for example:

+
+
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME); // Retrieve ClusterConfig
+clusterConfig.resetTaskQuotaRatioMap(); // Optional: you may want to reset the quota config before creating a new quota config
+clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 10); // Define the default quota (DEFAULT_QUOTA_TYPE = "DEFAULT")
+clusterConfig.setTaskQuotaRatio("A", 20); // Define quota type A
+clusterConfig.setTaskQuotaRatio("B", 10); // Define quota type B
+_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig); // Set the new ClusterConfig
+
+
+

A word of caution - if you do set the quota config, you must always define the default quota type (with the key “DEFAULT”). Otherwise, jobs with no type information will no longer be scheduled and run. If you have been using Task Framework prior to the inception of quota-based scheduling, you might have recurrent workflows whose jobs do not have any type set. If you neglect to include the default quota type, these recurrent workflows will not execute properly.

+

Upon setting the quota config in ClusterConfig, you will see the updated field in your ZooKeeper cluster config ZNode in the JSON format. See an example below:

+
+
{
+  "id":"Example_Cluster"
+  ,"simpleFields":{
+    "allowParticipantAutoJoin":"true"
+  }
+  ,"listFields":{
+  }
+  ,"mapFields":{
+    "QUOTA_TYPES":{
+      "A":"20"
+      ,"B":"10"
+      ,"DEFAULT":"10"
+    }
+  }
+}
+
+
+
    +
  • Setting a quota type for workflows and jobs The Builders for WorkflowConfig and JobConfig provides a method for setting the quota type for the job. See below: java JobConfig.Builder jobBuilderA = new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap) .addTaskConfigs(taskConfigsA).setNumConcurrentTasksPerInstance(50).setJobType("A"); // Setting the job quota type as "A" workflowBuilder.addJob("JOB_A", jobBuilderA);
  • +
+
+
+
+

FAQ

+
    +
  • What happens if I don’t set a quota config in ClusterConfig? +
      +
    • When no quota config is found in ClusterConfig, Task Framework will treat all incoming jobs as DEFAULT and will give 100% of quota resources to the default type.
    • +
  • +
  • What happens if my job doesn’t have a quota type set? +
      +
    • If Task Framework encounters a job without a quota type (that is, either the quotaType field is missing, is an empty String, or a literal “null”), then the job will be treated as a DEFAULT job.
    • +
  • +
  • What if there is a workflow/job whose quota type does not exist in the quota config I have in ClusterConfig? +
      +
    • Task Framework will not be able to locate the correct quota type, so it would treat it as the DEFAULT type and will assign and schedule accordingly using the quota for the DEFAULT type.
    • +
  • +
  • What about targeted jobs? +
      +
    • Quotas will also apply to targeted jobs, each task of the targeted job taking up a pre-set resource amount (currently each task occupies 1 JVM thread).
    • +
  • +
  • What about job queues? +
      +
    • Quota-based scheduling applies to all types of workflows - both generic workflows and job queues. A word of caution for the user is to be careful and always verify whether a job’s quota type has been properly set. Task Framework will not automatically delete or inform the user of the jobs that are stuck due to an invalid quota type, so we caution all users to make sure the quota type exists by querying their settings in ClusterConfig.
    • +
  • +
+
+
+

Future Steps

+

Quota-based task scheduling has been tested internally at LinkedIn and has been integrated into Apache Gobblin, enabling users of Helix Task Framework and Gobblin’s Job Launcher to define categories and corresponding quota values. There are a few immediate to-do’s that will improve the usability of this feature:

+
    +
  • More fine-grained quota profile
  • +
+

Currently, quota profiles apply across the entire cluster; that is, one quota profile defined in ClusterConfig will apply globally for all Participants. However, some use cases may require that each Participant have a different quota profile.

+
    +
  • Making Participants’ maximum JVM thread capacity configurable
  • +
+

Helix Task Framework has the maximum number of task threads set at 40. Making this configurable will potentially allow some users to increase throughput of tasks depending on the duration of execution of such tasks.

+
    +
  • Adding more dimensions to quota resource type
  • +
+

Currently, the number of JVM threads per Participant is the only dimension where Helix Task Framework defines quota in. However, as discussed in earlier sections, this is extendable to commonly-used constraints such as CPU usage, memory usage, or disk usage. As new dimensions are added, there will need to be additional implementation of the TaskAssigner interface that produces assignments for tasks based on constraints.

+
+
+
+
+
+ +
+ + + + +
+
+
+

Back to top

+ +

Reflow Maven skin by Andrius Velykis.

+ +
+
Apache Helix, Apache, the Apache feather logo, and the Apache Helix project logos are trademarks of The Apache Software Foundation. + All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ Privacy Policy +
+
+
+ + + + + + + + + + + + + + + + + + \ No newline at end of file Added: helix/site-content/1.0.1-docs/recipes/lock_manager.html URL: http://svn.apache.org/viewvc/helix/site-content/1.0.1-docs/recipes/lock_manager.html?rev=1879414&view=auto ============================================================================== --- helix/site-content/1.0.1-docs/recipes/lock_manager.html (added) +++ helix/site-content/1.0.1-docs/recipes/lock_manager.html Wed Jul 1 21:38:35 2020 @@ -0,0 +1,462 @@ + + + + + + + + Apache Helix - Distributed Lock Manager + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+ +
+ +
+
+ +
+
+
+
+ +
+ +

Distributed locks are used to synchronize accesses shared resources. Most applications today use ZooKeeper to model distributed locks.

+

The simplest way to model a lock using ZooKeeper is (See ZooKeeper leader recipe for an exact and more advanced solution)

+
    +
  • Each process tries to create an emphemeral node
  • +
  • If the node is successfully created, the process acquires the lock
  • +
  • Otherwise, it will watch the ZNode and try to acquire the lock again if the current lock holder disappears
  • +
+

This is good enough if there is only one lock. But in practice, an application will need many such locks. Distributing and managing the locks among difference process becomes challenging. Extending such a solution to many locks will result in:

+
    +
  • Uneven distribution of locks among nodes; the node that starts first will acquire all the locks. Nodes that start later will be idle.
  • +
  • When a node fails, how the locks will be distributed among remaining nodes is not predicable.
  • +
  • When new nodes are added the current nodes don't relinquish the locks so that new nodes can acquire some locks
  • +
+

In other words we want a system to satisfy the following requirements.

+
    +
  • Distribute locks evenly among all nodes to get better hardware utilization
  • +
  • If a node fails, the locks that were acquired by that node should be evenly distributed among other nodes
  • +
  • If nodes are added, locks must be evenly re-distributed among nodes.
  • +
+

Helix provides a simple and elegant solution to this problem. Simply specify the number of locks and Helix will ensure that above constraints are satisfied.

+

To quickly see this working run the lock-manager-demo script where 12 locks are evenly distributed among three nodes, and when a node fails, the locks get re-distributed among remaining two nodes. Note that Helix does not re-shuffle the locks completely, instead it simply distributes the locks relinquished by dead node among 2 remaining nodes evenly.

+
+
+

Short Version

+

This version starts multiple threads within the same process to simulate a multi node deployment. Try the long version to get a better idea of how it works.

+
+
git clone https://git-wip-us.apache.org/repos/asf/helix.git
+cd helix
+git checkout tags/helix-1.0.1
+mvn clean install package -DskipTests
+cd recipes/distributed-lock-manager/target/distributed-lock-manager-pkg/bin
+chmod +x *
+./lock-manager-demo
+
+
+
+

Output

+
+
./lock-manager-demo
+STARTING localhost_12000
+STARTING localhost_12002
+STARTING localhost_12001
+STARTED localhost_12000
+STARTED localhost_12002
+STARTED localhost_12001
+localhost_12001 acquired lock:lock-group_3
+localhost_12000 acquired lock:lock-group_8
+localhost_12001 acquired lock:lock-group_2
+localhost_12001 acquired lock:lock-group_4
+localhost_12002 acquired lock:lock-group_1
+localhost_12002 acquired lock:lock-group_10
+localhost_12000 acquired lock:lock-group_7
+localhost_12001 acquired lock:lock-group_5
+localhost_12002 acquired lock:lock-group_11
+localhost_12000 acquired lock:lock-group_6
+localhost_12002 acquired lock:lock-group_0
+localhost_12000 acquired lock:lock-group_9
+lockName    acquired By
+======================================
+lock-group_0    localhost_12002
+lock-group_1    localhost_12002
+lock-group_10    localhost_12002
+lock-group_11    localhost_12002
+lock-group_2    localhost_12001
+lock-group_3    localhost_12001
+lock-group_4    localhost_12001
+lock-group_5    localhost_12001
+lock-group_6    localhost_12000
+lock-group_7    localhost_12000
+lock-group_8    localhost_12000
+lock-group_9    localhost_12000
+Stopping localhost_12000
+localhost_12000 Interrupted
+localhost_12001 acquired lock:lock-group_9
+localhost_12001 acquired lock:lock-group_8
+localhost_12002 acquired lock:lock-group_6
+localhost_12002 acquired lock:lock-group_7
+lockName    acquired By
+======================================
+lock-group_0    localhost_12002
+lock-group_1    localhost_12002
+lock-group_10    localhost_12002
+lock-group_11    localhost_12002
+lock-group_2    localhost_12001
+lock-group_3    localhost_12001
+lock-group_4    localhost_12001
+lock-group_5    localhost_12001
+lock-group_6    localhost_12002
+lock-group_7    localhost_12002
+lock-group_8    localhost_12001
+lock-group_9    localhost_12001
+
+
+
+
+
+
+
+

Long version

+

This provides more details on how to setup the cluster and where to plugin application code.

+
+

Start ZooKeeper

+
+
./start-standalone-zookeeper 2199
+
+
+
+
+

Create a Cluster

+
+
./helix-admin --zkSvr localhost:2199 --addCluster lock-manager-demo
+
+
+
+
+

Create a Lock Group

+

Create a lock group and specify the number of locks in the lock group.

+
+
./helix-admin --zkSvr localhost:2199  --addResource lock-manager-demo lock-group 6 OnlineOffline --mode AUTO_REBALANCE
+
+
+
+
+

Start the Nodes

+

Create a Lock class that handles the callbacks.

+
+
public class Lock extends StateModel {
+  private String lockName;
+
+  public Lock(String lockName) {
+    this.lockName = lockName;
+  }
+
+  public void lock(Message m, NotificationContext context) {
+    System.out.println(" acquired lock:"+ lockName );
+  }
+
+  public void release(Message m, NotificationContext context) {
+    System.out.println(" releasing lock:"+ lockName );
+  }
+
+}
+
+
+

and a LockFactory that creates Locks

+
+
public class LockFactory extends StateModelFactory<Lock> {
+    /* Instantiates the lock handler, one per lockName */
+    public Lock create(String lockName) {
+        return new Lock(lockName);
+    }
+}
+
+
+

At node start up, simply join the cluster and Helix will invoke the appropriate callbacks on the appropriate Lock instance. One can start any number of nodes and Helix detects that a new node has joined the cluster and re-distributes the locks automatically.

+
+
public class LockProcess {
+  public static void main(String args) {
+    String zkAddress= "localhost:2199";
+    String clusterName = "lock-manager-demo";
+    //Give a unique id to each process, most commonly used format hostname_port
+    String instanceName ="localhost_12000";
+    ZKHelixAdmin helixAdmin = new ZKHelixAdmin(zkAddress);
+    //configure the instance and provide some metadata
+    InstanceConfig config = new InstanceConfig(instanceName);
+    config.setHostName("localhost");
+    config.setPort("12000");
+    admin.addInstance(clusterName, config);
+    //join the cluster
+    HelixManager manager;
+    manager = HelixManagerFactory.getHelixManager(clusterName,
+                                                  instanceName,
+                                                  InstanceType.PARTICIPANT,
+                                                  zkAddress);
+    manager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", modelFactory);
+    manager.connect();
+    Thread.currentThread.join();
+  }
+}
+
+
+
+
+

Start the Controller

+

The controller can be started either as a separate process or can be embedded within each node process

+
+
Separate Process
+

This is recommended when number of nodes in the cluster > 100. For fault tolerance, you can run multiple controllers on different boxes.

+
+
./run-helix-controller --zkSvr localhost:2199 --cluster lock-manager-demo 2>&1 > /tmp/controller.log &
+
+
+
+
+
Embedded Within the Node Process
+

This is recommended when the number of nodes in the cluster is less than 100. To start a controller from each process, simply add the following lines to MyClass

+
+
public class LockProcess {
+  public static void main(String args) {
+    String zkAddress= "localhost:2199";
+    String clusterName = "lock-manager-demo";
+    // .
+    // .
+    manager.connect();
+    HelixManager controller;
+    controller = HelixControllerMain.startHelixController(zkAddress,
+                                                          clusterName,
+                                                          "controller",
+                                                          HelixControllerMain.STANDALONE);
+    Thread.currentThread.join();
+  }
+}
+
+
+
+
+
+
+
+
+
+
+ +
+ + + + +
+
+
+

Back to top

+ +

Reflow Maven skin by Andrius Velykis.

+ +
+
Apache Helix, Apache, the Apache feather logo, and the Apache Helix project logos are trademarks of The Apache Software Foundation. + All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ Privacy Policy +
+
+
+ + + + + + + + + + + + + + + + + + \ No newline at end of file Added: helix/site-content/1.0.1-docs/recipes/rabbitmq_consumer_group.html URL: http://svn.apache.org/viewvc/helix/site-content/1.0.1-docs/recipes/rabbitmq_consumer_group.html?rev=1879414&view=auto ============================================================================== --- helix/site-content/1.0.1-docs/recipes/rabbitmq_consumer_group.html (added) +++ helix/site-content/1.0.1-docs/recipes/rabbitmq_consumer_group.html Wed Jul 1 21:38:35 2020 @@ -0,0 +1,417 @@ + + + + + + + + Apache Helix - RabbitMQ Consumer Group + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+ +
+ +
+
+ +
+
+
+
+ +
+ +

RabbitMQ is well-known open source software the provides robust messaging for applications.

+

One of the commonly implemented recipes using this software is a work queue. http://www.rabbitmq.com/tutorials/tutorial-four-java.html describes the use case where

+
    +
  • A producer sends a message with a routing key
  • +
  • The message is routed to the queue whose binding key exactly matches the routing key of the message
  • +
  • There are multiple consumers and each consumer is interested in processing only a subset of the messages by binding to the interested keys
  • +
+

The example provided here describes how multiple consumers can be started to process all the messages.

+

While this works, in production systems one needs the following:

+
    +
  • Ability to handle failures: when a consumer fails, another consumer must be started or the other consumers must start processing these messages that should have been processed by the failed consumer
  • +
  • When the existing consumers cannot keep up with the task generation rate, new consumers will be added. The tasks must be redistributed among all the consumers
  • +
+

In this recipe, we demonstrate handling of consumer failures and new consumer additions using Helix.

+

Mapping this usecase to Helix is pretty easy as the binding key/routing key is equivalent to a partition.

+

Let’s take an example. Lets say the queue has 6 partitions, and we have 2 consumers to process all the queues. What we want is all 6 queues to be evenly divided among 2 consumers. Eventually when the system scales, we add more consumers to keep up. This will make each consumer process tasks from 2 queues. Now let’s say that a consumer failed, reducing the number of active consumers to 2. This means each consumer must process 3 queues.

+

We showcase how such a dynamic application can be developed using Helix. Even though we use RabbitMQ as the pub/sub system one can extend this solution to other pub/sub systems.

+
+

Try It

+
+
git clone https://git-wip-us.apache.org/repos/asf/helix.git
+cd helix
+git checkout tags/helix-1.0.1
+mvn clean install package -DskipTests
+cd recipes/rabbitmq-consumer-group/bin
+chmod +x *
+export HELIX_PKG_ROOT=`pwd`/helix-core/target/helix-core-pkg
+export HELIX_RABBITMQ_ROOT=`pwd`/recipes/rabbitmq-consumer-group/
+chmod +x $HELIX_PKG_ROOT/bin/*
+chmod +x $HELIX_RABBITMQ_ROOT/bin/*
+
+
+
+

Install RabbitMQ

+

Setting up RabbitMQ on a local box is straightforward. You can find the instructions here http://www.rabbitmq.com/download.html

+
+
+

Start ZK

+

Start ZooKeeper at port 2199

+
+
$HELIX_PKG_ROOT/bin/start-standalone-zookeeper 2199
+
+
+
+
+

Setup the Consumer Group Cluster

+

This will setup the cluster by creating a “rabbitmq-consumer-group” cluster and adds a “topic” with “6” queues.

+
+
$HELIX_RABBITMQ_ROOT/bin/setup-cluster.sh localhost:2199
+
+
+
+
+

Add Consumers

+

Start 2 consumers in 2 different terminals. Each consumer is given a unique ID.

+
+
//start-consumer.sh zookeeperAddress (e.g. localhost:2181) consumerId , rabbitmqServer (e.g. localhost)
+$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 0 localhost
+$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 1 localhost
+
+
+
+
+
+

Start the Helix Controller

+

Now start a Helix controller that starts managing the “rabbitmq-consumer-group” cluster.

+
+
$HELIX_RABBITMQ_ROOT/bin/start-cluster-manager.sh localhost:2199
+
+
+
+
+

Send Messages to the Topic

+

Start sending messages to the topic. This script randomly selects a routing key (1-6) and sends the message to topic. Based on the key, messages gets routed to the appropriate queue.

+
+
$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 20
+
+
+

After running this, you should see all 20 messages being processed by 2 consumers.

+
+
+

Add Another Consumer

+

Once a new consumer is started, Helix detects it. In order to balance the load between 3 consumers, it deallocates 1 partition from the existing consumers and allocates it to the new consumer. We see that each consumer is now processing only 2 queues. Helix makes sure that old nodes are asked to stop consuming before the new consumer is asked to start consuming for a given partition. But the transitions for each partition can happen in parallel.

+
+
$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 2 localhost
+
+
+

Send messages again to the topic

+
+
$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 100
+
+
+

You should see that messages are now received by all 3 consumers.

+
+
+

Stop a Consumer

+

In any terminal press CTRL^C and notice that Helix detects the consumer failure and distributes the 2 partitions that were processed by failed consumer to the remaining 2 active consumers.

+
+
+
+

How does this work?

+

Find the entire code here.

+
+

Cluster Setup

+

This step creates ZNode on ZooKeeper for the cluster and adds the state model. We use online offline state model since there is no need for other states. The consumer is either processing a queue or it is not.

+

It creates a resource called “rabbitmq-consumer-group” with 6 partitions. The execution mode is set to AUTO_REBALANCE. This means that the Helix controls the assignment of partition to consumers and automatically distributes the partitions evenly among the active consumers. When a consumer is added or removed, it ensures that a minimum number of partitions are shuffled.

+
+
zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+    ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+// add cluster
+admin.addCluster(clusterName, true);
+
+// add state model definition
+StateModelConfigGenerator generator = new StateModelConfigGenerator();
+admin.addStateModelDef(clusterName, "OnlineOffline",
+    new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+
+// add resource "topic" which has 6 partitions
+String resourceName = "rabbitmq-consumer-group";
+admin.addResource(clusterName, resourceName, 6, "OnlineOffline", "AUTO_REBALANCE");
+
+
+
+
+
+

Starting the Consumers

+

The only thing consumers need to know is the ZooKeeper address, cluster name and consumer ID. It does not need to know anything else.

+
+
_manager = HelixManagerFactory.getZKHelixManager(_clusterName,
+                                                 _consumerId,
+                                                 InstanceType.PARTICIPANT,
+                                                 _zkAddr);
+
+StateMachineEngine stateMach = _manager.getStateMachineEngine();
+ConsumerStateModelFactory modelFactory =
+    new ConsumerStateModelFactory(_consumerId, _mqServer);
+stateMach.registerStateModelFactory("OnlineOffline", modelFactory);
+
+_manager.connect();
+
+
+

Once the consumer has registered the state model and the controller is started, the consumer starts getting callbacks (onBecomeOnlineFromOffline) for the partition it needs to host. All it needs to do as part of the callback is to start consuming messages from the appropriate queue. Similarly, when the controller deallocates a partitions from a consumer, it fires onBecomeOfflineFromOnline for the same partition. As a part of this transition, the consumer will stop consuming from a that queue.

+
+
@Transition(to = "ONLINE", from = "OFFLINE")
+public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+  LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
+  if (_thread == null) {
+    LOG.debug("Starting ConsumerThread for " + _partition + "...");
+    _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
+    _thread.start();
+    LOG.debug("Starting ConsumerThread for " + _partition + " done");
+
+  }
+}
+
+@Transition(to = "OFFLINE", from = "ONLINE")
+public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+    throws InterruptedException {
+  LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
+  if (_thread != null) {
+    LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+    _thread.interrupt();
+    _thread.join(2000);
+    _thread = null;
+    LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
+  }
+}
+
+
+
+
+
+
+
+
+ +
+ + + + +
+
+
+

Back to top

+ +

Reflow Maven skin by Andrius Velykis.

+ +
+
Apache Helix, Apache, the Apache feather logo, and the Apache Helix project logos are trademarks of The Apache Software Foundation. + All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ Privacy Policy +
+
+
+ + + + + + + + + + + + + + + + + + \ No newline at end of file