| -*-org-*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| * A new design for Qpid clustering. |
| |
| ** Issues with old cluster design |
| |
| See [[./old-cluster-issues.txt]] |
| |
| ** A new cluster design. |
| |
| 1. Clearly defined interface between broker code and cluster plug-in. |
| |
| 2. Replicate queue events rather than client data. |
| - Only requires consistent enqueue order. |
| - Events only need be serialized per-queue, allows concurrency between queues |
| - Allows for replicated and non-replicated queues. |
| |
| 3. Use a lock protocol to agree order of dequeues: only the broker |
| holding the lock can acqiure & dequeue. No longer relies on |
| identical state and lock-step behavior to cause identical dequeues |
| on each broker. |
| |
| 4. Use multiple CPG groups to process different queues in |
| parallel. Use a fixed set of groups and hash queue names to choose |
| the group for each queue. |
| |
| *** Requirements |
| |
| The cluster must provide these delivery guarantees: |
| |
| - client sends transfer: message must be replicated and not lost even if the local broker crashes. |
| - client acquires a message: message must not be delivered on another broker while acquired. |
| - client accepts message: message is forgotten, will never be delivered or re-queued by any broker. |
| - client releases message: message must be re-queued on cluster and not lost. |
| - client rejects message: message must be dead-lettered or discarded and forgotten. |
| - client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. |
| |
| Each guarantee takes effect when the client receives a *completion* |
| for the associated command (transfer, acquire, reject, accept) |
| |
| *** Broker receiving messages |
| |
| On recieving a message transfer, in the connection thread we: |
| - multicast a message-received event. |
| - enqueue and complete the transfer when it is self-delivered. |
| |
| Other brokers enqueue the message when they recieve the message-received event. |
| |
| Enqueues are queued up with other queue operations to be executed in the |
| thread context associated with the queue. |
| |
| *** Broker sending messages: moving queue ownership |
| |
| Each queue is *owned* by at most one cluster broker at a time. Only |
| that broker may acquire or dequeue messages. The owner multicasts |
| notification of messages it acquires/dequeues to the cluster. |
| Periodically the owner hands over ownership to another interested |
| broker, providing time-shared access to the queue among all interested |
| brokers. |
| |
| We assume the same IO-driven dequeuing algorithm as the standalone |
| broker with one modification: queues can be "locked". A locked queue |
| is not available for dequeuing messages and will be skipped by the |
| output algorithm. |
| |
| At any given time only those queues owned by the local broker will be |
| unlocked. |
| |
| As messages are acquired/dequeued from unlocked queues by the IO threads |
| the broker multicasts acquire/dequeue events to the cluster. |
| |
| When an unlocked queue has no more consumers with credit, or when a |
| time limit expires, the broker relinquishes ownership by multicasting |
| a release-queue event, allowing another interested broker to take |
| ownership. |
| |
| *** Asynchronous completion of accept |
| |
| In acknowledged mode a message is not forgotten until it is accepted, |
| to allow for requeue on rejection or crash. The accept should not be |
| completed till the message has been forgotten. |
| |
| On receiving an accept the broker: |
| - dequeues the message from the local queue |
| - multicasts an "accept" event |
| - completes the accept asynchronously when the dequeue event is self delivered. |
| |
| NOTE: The message store does not currently implement asynchronous |
| completions of accept, this is a bug. |
| |
| *** Multiple CPG groups. |
| |
| The old cluster was bottlenecked by processing everything in a single |
| CPG deliver thread. |
| |
| The new cluster uses a set of CPG groups, one per core. Queue names |
| are hashed to give group indexes, so statistically queues are likely |
| to be spread over the set of groups. |
| |
| Operations on a given queue always use the same group, so we have |
| order within each queue, but operations on different queues can use |
| different groups giving greater throughput sending to CPG and multiple |
| handler threads to process CPG messages. |
| |
| ** Inconsistent errors. |
| |
| An inconsistent error means that after multicasting an enqueue, accept |
| or dequeue, some brokers succeed in processing it and others fail. |
| |
| The new design eliminates most sources of inconsistent errors in the |
| old broker: connections, sessions, security, management etc. Only |
| store journal errors remain. |
| |
| The new inconsistent error protocol is similar to the old one with one |
| major improvement: brokers do not have to stall processing while an |
| error is being resolved. |
| |
| ** Updating new members |
| |
| When a new member (the updatee) joins a cluster it needs to be brought |
| up to date with the rest of the cluster. An existing member (the |
| updater) sends an "update". |
| |
| In the old cluster design the update is a snapshot of the entire |
| broker state. To ensure consistency of the snapshot both the updatee |
| and the updater "stall" at the start of the update, i.e. they stop |
| processing multicast events and queue them up for processing when the |
| update is complete. This creates a back-log of work to get through, |
| which leaves them lagging behind the rest of the cluster till they |
| catch up (which is not guaranteed to happen in a bounded time.) |
| |
| With the new cluster design only exchanges, queues, bindings and |
| messages need to be replicated. |
| |
| We update individual objects (queues and exchanges) independently. |
| - create queues first, then update all queues and exchanges in parallel. |
| - multiple updater threads, per queue/exchange. |
| |
| Queue updater: |
| - marks the queue position at the sync point |
| - sends messages starting from the sync point working towards the head of the queue. |
| - send "done" message. |
| |
| Queue updatee: |
| - enqueues received from CPG: add to back of queue as normal. |
| - dequeues received from CPG: apply if found, else save to check at end of update. |
| - messages from updater: add to the *front* of the queue. |
| - update complete: apply any saved dequeues. |
| |
| Exchange updater: |
| - updater: send snapshot of exchange as it was at the sync point. |
| |
| Exchange updatee: |
| - queue exchange operations after the sync point. |
| - when snapshot is received: apply saved operations. |
| |
| Note: |
| - Updater is active throughout, no stalling. |
| - Consuming clients actually reduce the size of the update. |
| - Updatee stalls clients until the update completes. |
| (Note: May be possible to avoid updatee stall as well, needs thought) |
| |
| ** Internal cluster interface |
| |
| The new cluster interface is similar to the MessageStore interface, but |
| provides more detail (message positions) and some additional call |
| points (e.g. acquire) |
| |
| The cluster interface captures these events: |
| - wiring changes: queue/exchange declare/bind |
| - message enqueued/acquired/released/rejected/dequeued. |
| - transactional events. |
| |
| ** Maintainability |
| |
| This design gives us more robust code with a clear and explicit interfaces. |
| |
| The cluster depends on specific events clearly defined by an explicit |
| interface. Provided the semantics of this interface are not violated, |
| the cluster will not be broken by changes to broker code. |
| |
| The cluster no longer requires identical processing of the entire |
| broker stack on each broker. It is not affected by the details of how |
| the broker allocates messages. It is independent of the |
| protocol-specific state of connections and sessions and so is |
| protected from future protocol changes (e.g. AMQP 1.0) |
| |
| A number of specific ways the code will be simplified: |
| - drop code to replicate management model. |
| - drop timer workarounds for TTL, management, heartbeats. |
| - drop "cluster-safe assertions" in broker code. |
| - drop connections, sessions, management from cluster update. |
| - drop security workarounds: cluster code now operates after message decoding. |
| - drop connection tracking in cluster code. |
| - simper inconsistent-error handling code, no need to stall. |
| |
| ** Performance |
| |
| The standalone broker processes _connections_ concurrently, so CPU |
| usage increases as you add more connections. |
| |
| The new cluster processes _queues_ concurrently, so CPU usage increases as you |
| add more queues. |
| |
| In both cases, CPU usage peaks when the number of "units of |
| concurrency" (connections or queues) goes above the number of cores. |
| |
| When all consumers on a queue are connected to the same broker the new |
| cluster uses the same messagea allocation threading/logic as a |
| standalone broker, with a little extra asynchronous book-keeping. |
| |
| If a queue has multiple consumers connected to multiple brokers, the |
| new cluster time-shares the queue which is less efficient than having |
| all consumers on a queue connected to the same broker. |
| |
| ** Flow control |
| New design does not queue up CPG delivered messages, they are |
| processed immediately in the CPG deliver thread. This means that CPG's |
| flow control is sufficient for qpid. |
| |
| ** Live upgrades |
| |
| Live upgrades refers to the ability to upgrade a cluster while it is |
| running, with no downtime. Each brokers in the cluster is shut down, |
| and then re-started with a new version of the broker code. |
| |
| To achieve this |
| - Cluster protocl XML file has a new element <version number=N> attached |
| to each method. This is the version at which the method was added. |
| - New versions can only add methods, existing methods cannot be changed. |
| - The cluster handshake for new members includes the protocol version |
| at each member. |
| - Each cpg message starts with header: version,size. Allows new encodings later. |
| - Brokers ignore messages of higher version. |
| |
| - The cluster's version is the lowest version among its members. |
| - A newer broker can join and older cluster. When it does, it must restrict |
| itself to speaking the older version protocol. |
| - When the cluster version increases (because the lowest version member has left) |
| the remaining members may move up to the new version. |
| |
| |
| * Design debates |
| ** Active/active vs. active passive |
| |
| An active-active cluster can be used in an active-passive mode. In |
| this mode we would like the cluster to be as efficient as a strictly |
| active-passive implementation. |
| |
| An active/passive implementation allows some simplifications over active/active: |
| - drop Queue ownership and locking |
| - don't need to replicate message acquisition. |
| - can do immediate local enqueue and still guarantee order. |
| |
| Active/passive introduces a few extra requirements: |
| - Exactly one broker has to take over if primary fails. |
| - Passive members must refuse client connections. |
| - On failover, clients must re-try all known addresses till they find the active member. |
| |
| Active/active benefits: |
| - A broker failure only affects the subset of clients connected to that broker. |
| - Clients can switch to any other broker on failover |
| - Backup brokers are immediately available on failover. |
| - As long as a client can connect to any broker in the cluster, it can be served. |
| |
| Active/passive benefits: |
| - Don't need to replicate message allocation, can feed consumers at top speed. |
| |
| Active/passive drawbacks: |
| - All clients on one node so a failure affects every client in the |
| system. |
| - After a failure there is a "reconnect storm" as every client |
| reconnects to the new active node. |
| - After a failure there is a period where no broker is active, until |
| the other brokers realize the primary is gone and agree on the new |
| primary. |
| - Clients must find the single active node, may involve multiple |
| connect attempts. |
| - No service if a partition separates a client from the active broker, |
| even if the client can see other brokers. |
| |
| |