| -*-org-*- |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| * Status of impementation |
| |
| Meaning of priorities: |
| [#A] Essential for basic functioning. |
| [#B] Required for first release. |
| [#C] Can be addressed in a later release. |
| |
| The existig prototype is bare bones to do performance benchmarks: |
| - Implements publish and consumer locking protocol. |
| - Defered delivery and asynchronous completion of message. |
| - Optimize the case all consumers are on the same node. |
| - No new member updates, no failover updates, no transactions, no persistence etc. |
| |
| Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/ |
| |
| ** Similarities to existing cluster. |
| |
| /Active-active/: the new cluster can be a drop-in replacement for the |
| old, existing tests & customer deployment configurations are still |
| valid. |
| |
| /Virtual synchrony/: Uses corosync to co-ordinate activity of members. |
| |
| /XML controls/: Uses XML to define the primitives multicast to the |
| cluster. |
| |
| ** Differences with existing cluster. |
| |
| /Report rather than predict consumption/: brokers explicitly tell each |
| other which messages have been acquired or dequeued. This removes the |
| major cause of bugs in the existing cluster. |
| |
| /Queue consumer locking/: to avoid duplicates only one broker can acquire or |
| dequeue messages at a time - while has the consume-lock on the |
| queue. If multiple brokers are consuming from the same queue the lock |
| is passed around to time-share access to the queue. |
| |
| /Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting |
| the concurrency of the host) to allow concurrent processing on |
| different queues. Queues are hashed onto the groups. |
| |
| * Completed tasks |
| ** DONE [#A] Minimal POC: publish/acquire/dequeue protocol. |
| CLOSED: [2011-10-05 Wed 16:03] |
| |
| Defines broker::Cluster interface and call points. |
| Initial interface commite |
| |
| Main classes |
| Core: central object holding cluster classes together (replaces cluster::Cluster) |
| BrokerContext: implements broker::Cluster interface. |
| QueueContext: Attached to a broker::Queue, holds cluster status. |
| MessageHolder:holds local messages while they are being enqueued. |
| |
| Implements multiple CPG groups for better concurrency. |
| |
| ** DONE [#A] Large message replication. |
| CLOSED: [2011-10-05 Wed 17:22] |
| Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame) |
| |
| * Design Questions |
| ** [[Queue sequence numbers vs. independant message IDs]] |
| |
| Current prototype uses queue+sequence number to identify message. This |
| is tricky for updating new members as the sequence numbers are only |
| known on delivery. |
| |
| Independent message IDs that can be generated and sent as part of the |
| message simplify this and potentially allow performance benefits by |
| relaxing total ordering. However they require additional map lookups |
| that hurt performance. |
| |
| - [X] Prototype independent message IDs, check performance. |
| Throughput worse by 30% in contented case, 10% in uncontended. |
| |
| * Tasks to match existing cluster |
| ** TODO [#A] Review old cluster code for more tasks. 1 |
| ** TODO [#A] Put cluster enqueue after all policy & other checks. |
| |
| gsim: we do policy check after multicasting enqueue so |
| could have inconsistent outcome. |
| |
| aconway: Multicast should be after enqueue and any other code that may |
| decide to send/not send the message. |
| |
| gsime: while later is better, is moving it that late the right thing? |
| That will mean for example that any dequeues triggered by the enqueue |
| (e.g. ring queue or lvq) will happen before the enqueue is broadcast. |
| |
| ** TODO [#A] Defer and async completion of wiring commands. 5 |
| Testing requirement: Many tests assume wiring changes are visible |
| across the cluster once the wiring commad completes. |
| |
| Name clashes: avoid race if same name queue/exchange declared on 2 |
| brokers simultaneously. |
| |
| Ken async accept, never merged: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/ |
| |
| Clashes with non-replicated: see [[Allow non-replicated]] below. |
| |
| ** TODO [#A] defer & async completion for explicit accept. |
| |
| Explicit accept currently ignores the consume lock. Defer and complete |
| it when the lock is acquired. |
| |
| ** TODO [#A] Update to new members joining. 10. |
| |
| Need to resolve [[Queue sequence numbers vs. independant message IDs]] |
| first. |
| - implicit sequence numbers are more tricky to replicate to new member. |
| |
| Update individual objects (queues and exchanges) independently. |
| - create queues first, then update all queues and exchanges in parallel. |
| - multiple updater threads, per queue/exchange. |
| - updater sends messages to special exchange(s) (not using extended AMQP controls) |
| |
| Queue updater: |
| - marks the queue position at the sync point |
| - sends messages starting from the sync point working towards the head of the queue. |
| - send "done" message. |
| Note: updater remains active throughout, consuming clients actually reduce the |
| size of the update. |
| |
| Queue updatee: |
| - enqueues received from CPG: add to back of queue as normal. |
| - dequeues received from CPG: apply if found, else save to check at end of update. |
| - messages from updater: add to the *front* of the queue. |
| - update complete: apply any saved dequeues. |
| |
| Exchange updater: |
| - updater: send snapshot of exchange as it was at the sync point. |
| |
| Exchange updatee: |
| - queue exchange operations after the sync point. |
| - when snapshot is received: apply saved operations. |
| |
| Updater remains active throughout. |
| Updatee stalls clients until the update completes. |
| |
| Updating queue/exchange/binding objects is via the same encode/decode |
| that is used by the store. Updatee to use recovery interfaces to |
| recover? |
| |
| ** TODO [#A] Failover updates to client. 2 |
| Implement the amq.failover exchange to notify clients of membership. |
| ** TODO [#A] Passing all existing cluster tests. 5 |
| |
| The new cluster should be a drop-in replacement for the old, so it |
| should be able to pass all the existing tests. |
| |
| ** TODO [#B] Initial status protocol. 3 |
| Handshake to give status of each broker member to new members joining. |
| Status includes |
| - cluster protocol version. |
| - persistent store state (clean, dirty) |
| - make it extensible, so additional state can be added in new protocols |
| |
| Clean store if last man standing or clean shutdown. |
| Need to add multicast controls for shutdown. |
| |
| ** TODO [#B] Persistent cluster startup. 4 |
| |
| Based on existing code: |
| - Exchange dirty/clean exchanged in initial status. |
| - Only one broker recovers from store, others update. |
| ** TODO [#B] Replace boost::hash with our own hash function. 1 |
| The hash function is effectively part of the interface so |
| we need to be sure it doesn't change underneath us. |
| |
| ** TODO [#B] Management model. 3 |
| Alerts for inconsistent message loss. |
| |
| ** TODO [#B] Management methods that modify queues. 5 |
| |
| Replicate management methods that modify queues - e.g. move, purge. |
| Target broker may not have all messages on other brokers for purge/destroy. |
| - Queue::purge() - wait for lock, purge local, mcast dequeues. |
| - Queue::move() - wait for lock, move msgs (mcasts enqueues), mcast dequeues. |
| - Queue::destroy() - messages to alternate exchange on all brokers. |
| - Queue::get() - ??? |
| Need to add callpoints & mcast messages to replicate these? |
| ** TODO [#B] TX transaction support. 5 |
| Extend broker::Cluster interface to capture transaction context and completion. |
| Running brokers exchange TX information. |
| New broker update includes TX information. |
| |
| // FIXME aconway 2010-10-18: As things stand the cluster is not |
| // compatible with transactions |
| // - enqueues occur after routing is complete |
| // - no call to Cluster::enqueue, should be in Queue::process? |
| // - no transaction context associated with messages in the Cluster interface. |
| // - no call to Cluster::accept in Queue::dequeueCommitted |
| |
| Injecting holes into a queue: |
| - Multicast a 'non-message' that just consumes one queue position. |
| - Used to reserve a message ID (position) for a non-commited message. |
| - Also could allow non-replicated messages on a replicated queue if required. |
| |
| ** TODO [#B] DTX transaction support. 5 |
| Extend broker::Cluster interface to capture transaction context and completion. |
| Running brokers exchange DTX information. |
| New broker update includes DTX information. |
| |
| ** TODO [#B] Replicate message groups? |
| Message groups may require additional state to be replicated. |
| ** TODO [#B] Replicate state for Fairshare? |
| gsim: fairshare would need explicit code to keep it in sync across |
| nodes; that may not be required however. |
| ** TODO [#B] Timed auto-delete queues? |
| gsim: may need specific attention? |
| ** TODO [#B] Async completion of accept. 4 |
| When this is fixed in the standalone broker, it should be fixed for cluster. |
| |
| ** TODO [#B] Network partitions and quorum. 2 |
| Re-use existing implementation. |
| |
| ** TODO [#B] Review error handling, put in a consitent model. 4. |
| - [ ] Review all asserts, for possible throw. |
| - [ ] Decide on fatal vs. non-fatal errors. |
| |
| ** TODO [#B] Implement inconsistent error handling policy. 5 |
| What to do if a message is enqueued sucessfully on some broker(s), |
| but fails on other(s) - e.g. due to store limits? |
| - fail on local broker = possible message loss. |
| - fail on non-local broker = possible duplication. |
| |
| We have more flexibility now, we don't *have* to crash |
| - but we've lost some of our redundancy guarantee, how to inform user? |
| |
| Options to respond to inconsistent error: |
| - stop broker |
| - reset broker (exec a new qpidd) |
| - reset queue |
| - log critical |
| - send management event |
| |
| Most important is to inform of the risk of message loss. |
| Current favourite: reset queue+log critical+ management event. |
| Configurable choices? |
| |
| ** TODO [#C] Allow non-replicated exchanges, queues. 5 |
| |
| 3 levels set in declare arguments: |
| - qpid.replicate=no - nothing is replicated. |
| - qpid.replicate=wiring - queues/exchanges are replicated but not messages. |
| - qpid.replicate=yes - queues exchanges and messages are replicated. |
| |
| Wiring use case: it's OK to lose some messages (up to the max depth of |
| the queue) but the queue/exchange structure must be highly available |
| so clients can resume communication after fail over. |
| |
| Configurable default? Default same as old cluster? |
| |
| Need to |
| - save replicated status to stored (in arguments). |
| - support in management tools. |
| |
| Avoid name clashes between replicated/non-replicated: multicast |
| local-only names as well, all brokers keep a map and refuse to create |
| clashes. |
| |
| ** TODO [#C] Handling immediate messages in a cluster. 2 |
| Include remote consumers in descision to deliver an immediate message. |
| * Improvements over existing cluster |
| ** TODO [#C] Remove old cluster hacks and workarounds. |
| The old cluster has workarounds in the broker code that can be removed. |
| - [ ] drop code to replicate management model. |
| - [ ] drop timer workarounds for TTL, management, heartbeats. |
| - [ ] drop "cluster-safe assertions" in broker code. |
| - [ ] drop connections, sessions, management from cluster update. |
| - [ ] drop security workarounds: cluster code now operates after message decoding. |
| - [ ] drop connection tracking in cluster code. |
| - [ ] simpler inconsistent-error handling code, no need to stall. |
| |
| ** TODO [#C] Support for live upgrades. |
| |
| Allow brokers in a running cluster to be replaced one-by-one with a new version. |
| (see new-cluster-design for design notes.) |
| |
| The old cluster protocol was unstable because any changes in broker |
| state caused changes to the cluster protocol.The new design should be |
| much more stable. |
| |
| Points to implement in anticipation of live upgrade: |
| - Prefix each CPG message with a version number and length. |
| Version number determines how to decode the message. |
| - Brokers ignore messages that have a higher version number than they understand. |
| - Protocol version XML element in cluster.xml, on each control. |
| - Initial status protocol to include protocol version number. |
| |
| New member udpates: use the store encode/decode for updates, use the |
| same backward compatibility strategy as the store. This allows for |
| adding new elements to the end of structures but not changing or |
| removing new elements. |
| |
| NOTE: Any change to the association of CPG group names and queues will |
| break compatibility. How to work around this? |
| |
| ** TODO [#C] Refactoring of common concerns. |
| |
| There are a bunch of things that act as "Queue observers" with intercept |
| points in similar places. |
| - QueuePolicy |
| - QueuedEvents (async replication) |
| - MessageStore |
| - Cluster |
| |
| Look for ways to capitalize on the similarity & simplify the code. |
| |
| In particular QueuedEvents (async replication) strongly resembles |
| cluster replication, but over TCP rather than multicast. |
| |
| ** TODO [#C] Support for AMQP 1.0. |
| |
| * Testing |
| ** TODO [#A] Pass all existing cluster tests. |
| Requires [[Defer and async completion of wiring commands.]] |
| ** TODO [#A] New cluster tests. |
| Stress tests & performance benchmarks focused on changes in new cluster: |
| - concurrency by queues rather than connections. |
| - different handling shared queues when consuemrs are on different brokers. |