RabbitMQ is well-known open source software the provides robust messaging for applications.
One of the commonly implemented recipes using this software is a work queue. http://www.rabbitmq.com/tutorials/tutorial-four-java.html describes the use case where
The example provided here describes how multiple consumers can be started to process all the messages.
While this works, in production systems one needs the following:
In this recipe, we demonstrate handling of consumer failures and new consumer additions using Helix.
Mapping this usecase to Helix is pretty easy as the binding key/routing key is equivalent to a partition.
Let‘s take an example. Lets say the queue has 6 partitions, and we have 2 consumers to process all the queues. What we want is all 6 queues to be evenly divided among 2 consumers. Eventually when the system scales, we add more consumers to keep up. This will make each consumer process tasks from 2 queues. Now let’s say that a consumer failed, reducing the number of active consumers to 2. This means each consumer must process 3 queues.
We showcase how such a dynamic application can be developed using Helix. Even though we use RabbitMQ as the pub/sub system one can extend this solution to other pub/sub systems.
git clone https://git-wip-us.apache.org/repos/asf/helix.git cd helix git checkout tags/helix-0.9.4 mvn clean install package -DskipTests cd recipes/rabbitmq-consumer-group/bin chmod +x * export HELIX_PKG_ROOT=`pwd`/helix-core/target/helix-core-pkg export HELIX_RABBITMQ_ROOT=`pwd`/recipes/rabbitmq-consumer-group/ chmod +x $HELIX_PKG_ROOT/bin/* chmod +x $HELIX_RABBITMQ_ROOT/bin/*
Setting up RabbitMQ on a local box is straightforward. You can find the instructions here http://www.rabbitmq.com/download.html
Start ZooKeeper at port 2199
$HELIX_PKG_ROOT/bin/start-standalone-zookeeper 2199
This will setup the cluster by creating a “rabbitmq-consumer-group” cluster and adds a “topic” with “6” queues.
$HELIX_RABBITMQ_ROOT/bin/setup-cluster.sh localhost:2199
Start 2 consumers in 2 different terminals. Each consumer is given a unique ID.
//start-consumer.sh zookeeperAddress (e.g. localhost:2181) consumerId , rabbitmqServer (e.g. localhost) $HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 0 localhost $HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 1 localhost
Now start a Helix controller that starts managing the “rabbitmq-consumer-group” cluster.
$HELIX_RABBITMQ_ROOT/bin/start-cluster-manager.sh localhost:2199
Start sending messages to the topic. This script randomly selects a routing key (1-6) and sends the message to topic. Based on the key, messages gets routed to the appropriate queue.
$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 20
After running this, you should see all 20 messages being processed by 2 consumers.
Once a new consumer is started, Helix detects it. In order to balance the load between 3 consumers, it deallocates 1 partition from the existing consumers and allocates it to the new consumer. We see that each consumer is now processing only 2 queues. Helix makes sure that old nodes are asked to stop consuming before the new consumer is asked to start consuming for a given partition. But the transitions for each partition can happen in parallel.
$HELIX_RABBITMQ_ROOT/bin/start-consumer.sh localhost:2199 2 localhost
Send messages again to the topic
$HELIX_RABBITMQ_ROOT/bin/send-message.sh localhost 100
You should see that messages are now received by all 3 consumers.
In any terminal press CTRL^C and notice that Helix detects the consumer failure and distributes the 2 partitions that were processed by failed consumer to the remaining 2 active consumers.
Find the entire code here.
This step creates ZNode on ZooKeeper for the cluster and adds the state model. We use online offline state model since there is no need for other states. The consumer is either processing a queue or it is not.
It creates a resource called “rabbitmq-consumer-group” with 6 partitions. The execution mode is set to AUTO_REBALANCE. This means that the Helix controls the assignment of partition to consumers and automatically distributes the partitions evenly among the active consumers. When a consumer is added or removed, it ensures that a minimum number of partitions are shuffled.
zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); ZKHelixAdmin admin = new ZKHelixAdmin(zkclient); // add cluster admin.addCluster(clusterName, true); // add state model definition StateModelConfigGenerator generator = new StateModelConfigGenerator(); admin.addStateModelDef(clusterName, "OnlineOffline", new StateModelDefinition(generator.generateConfigForOnlineOffline())); // add resource "topic" which has 6 partitions String resourceName = "rabbitmq-consumer-group"; admin.addResource(clusterName, resourceName, 6, "OnlineOffline", "AUTO_REBALANCE");
The only thing consumers need to know is the ZooKeeper address, cluster name and consumer ID. It does not need to know anything else.
_manager = HelixManagerFactory.getZKHelixManager(_clusterName, _consumerId, InstanceType.PARTICIPANT, _zkAddr); StateMachineEngine stateMach = _manager.getStateMachineEngine(); ConsumerStateModelFactory modelFactory = new ConsumerStateModelFactory(_consumerId, _mqServer); stateMach.registerStateModelFactory("OnlineOffline", modelFactory); _manager.connect();
Once the consumer has registered the state model and the controller is started, the consumer starts getting callbacks (onBecomeOnlineFromOffline) for the partition it needs to host. All it needs to do as part of the callback is to start consuming messages from the appropriate queue. Similarly, when the controller deallocates a partitions from a consumer, it fires onBecomeOfflineFromOnline for the same partition. As a part of this transition, the consumer will stop consuming from a that queue.
@Transition(to = "ONLINE", from = "OFFLINE") public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition); if (_thread == null) { LOG.debug("Starting ConsumerThread for " + _partition + "..."); _thread = new ConsumerThread(_partition, _mqServer, _consumerId); _thread.start(); LOG.debug("Starting ConsumerThread for " + _partition + " done"); } } @Transition(to = "OFFLINE", from = "ONLINE") public void onBecomeOfflineFromOnline(Message message, NotificationContext context) throws InterruptedException { LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition); if (_thread != null) { LOG.debug("Stopping " + _consumerId + " for " + _partition + "..."); _thread.interrupt(); _thread.join(2000); _thread = null; LOG.debug("Stopping " + _consumerId + " for " + _partition + " done"); } }