| h1. Distributed Priority Queue |
| |
| h2. *IMPORTANT* \- We recommend that you do NOT use ZooKeeper for Queues. Please see [[Tech Note 4|https://cwiki.apache.org/confluence/display/CURATOR/TN4]] for details. |
| |
| h2. Description |
| An implementation of the Distributed Priority Queue ZK recipe. |
| |
| h2. Participating Classes |
| * QueueBuilder |
| * QueueConsumer |
| * QueueSerializer |
| * DistributedPriorityQueue |
| |
| h2. Usage |
| h3. Creating a DistributedPriorityQueue |
| {code} |
| public static <T> QueueBuilder<T> builder(CuratorFramework client, |
| QueueConsumer<T> consumer, |
| QueueSerializer<T> serializer, |
| java.lang.String queuePath) |
| Parameters: |
| client - the curator client |
| consumer - message consumer |
| serializer - serializer to use for items |
| queuePath - path to store queue |
| {code} |
| |
| {code} |
| QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path); |
| ... more builder method calls as needed ... |
| DistributedPriorityQueue<MessageType> queue = builder.buildPriorityQueue(minItemsBeforeRefresh); |
| {code} |
| |
| {code} |
| public DistributedPriorityQueue<T> buildPriorityQueue(int minItemsBeforeRefresh) |
| Build a DistributedPriorityQueue from the current builder values. |
| |
| When the priority queue detects an item addition/removal, it will stop processing its current list of items and refresh the |
| list. minItemsBeforeRefresh modifies this. It determines the minimum number of items from the active list that will get processed before a refresh. |
| |
| Due to a quirk in the way ZooKeeper notifies changes, the queue will get an item addition/remove notification after every item is |
| processed. This can lead to poor performance. Set minItemsBeforeRefresh to the value your application can tolerate being out of sync. |
| |
| For example: if the queue sees 10 items to process, it will end up making 10 calls to ZooKeeper to check status. You can control |
| this by setting minItemsBeforeRefresh to 10 (or more) and the queue will only refresh with ZooKeeper after 10 items are processed |
| |
| Parameters: |
| minItemsBeforeRefresh - minimum items to process before refreshing the item list |
| {code} |
| |
| h2. General Usage |
| The queue must be started via the {{start()}} method. Call {{close()}} when you are done with the queue. |
| |
| To add messages to the queue: |
| {code} |
| queue.put(aMessage, priority); |
| {code} |
| |
| The consumer ({{QueueConsumer.consumeMessage()}}) will get called as messages arrive. |
| |
| h2. Lock Safety |
| In the general usage case, the message is removed from the queue prior to the consumer being called. A more atomic mode is |
| provided that removes the item from the queue only after the consumer successfully returns. To enable this mode, call the |
| {{lockPath()}} method of the Queue Builder. This uses a lock to make the message recoverable. A lock is held while the |
| message is being processed \- this prevents other processes from taking the message. The message will not be removed from the queue |
| until the consumer functor returns. Thus, if there is a failure or the process dies, the message will get sent to another process. |
| There is a small performance penalty for this behavior however. |
| |
| h2. Data Format |
| Same as [[Distributed Queue|distributed-queue.html]]. |
| |
| h2. Error Handling |
| The {{QueueConsumer}} class extends {{ConnectionStateListener}}. When the queue is started, it adds the listener to the |
| Curator instance. Users of the {{DistributedPriorityQueue}} must pay attention to any connection state changes. |
| |
| If the SUSPENDED state is reported, the instance must assume that, until it receives a RECONNECTED state, the queue is no longer being |
| updated. If the LOST state is reported, the queue is permanently down. |