| -*-org-*- |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| * Status of impementation |
| |
| Meaning of priorities: |
| [#A] Essential for basic functioning. |
| [#B] Required for first release. |
| [#C] Can be addressed in a later release. |
| |
| The existig prototype is bare bones to do performance benchmarks: |
| - Implements publish and consumer locking protocol. |
| - Defered delivery and asynchronous completion of message. |
| - Optimize the case all consumers are on the same node. |
| - No new member updates, no failover updates, no transactions, no persistence etc. |
| |
| Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/ |
| |
| ** Similarities to existing cluster. |
| |
| /Active-active/: the new cluster can be a drop-in replacement for the |
| old, existing tests & customer deployment configurations are still |
| valid. |
| |
| /Virtual synchrony/: Uses corosync to co-ordinate activity of members. |
| |
| /XML controls/: Uses XML to define the primitives multicast to the |
| cluster. |
| |
| ** Differences with existing cluster. |
| |
| /Report rather than predict consumption/: brokers explicitly tell each |
| other which messages have been acquired or dequeued. This removes the |
| major cause of bugs in the existing cluster. |
| |
| /Queue consumer locking/: to avoid duplicates only one broker can acquire or |
| dequeue messages at a time - while has the consume-lock on the |
| queue. If multiple brokers are consuming from the same queue the lock |
| is passed around to time-share access to the queue. |
| |
| /Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting |
| the concurrency of the host) to allow concurrent processing on |
| different queues. Queues are hashed onto the groups. |
| |
| * Completed tasks |
| ** DONE [#A] Minimal POC: publish/acquire/dequeue protocol. |
| CLOSED: [2011-10-05 Wed 16:03] |
| |
| Defines broker::Cluster interface and call points. |
| Initial interface commite |
| |
| Main classes |
| Core: central object holding cluster classes together (replaces cluster::Cluster) |
| BrokerContext: implements broker::Cluster interface. |
| QueueContext: Attached to a broker::Queue, holds cluster status. |
| MessageHolder:holds local messages while they are being enqueued. |
| |
| Implements multiple CPG groups for better concurrency. |
| |
| ** DONE [#A] Large message replication. |
| CLOSED: [2011-10-05 Wed 17:22] |
| Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame) |
| |
| * Open questions |
| |
| ** TODO [#A] Queue sequence numbers vs. independant message IDs. |
| SCHEDULED: <2011-10-07 Fri> |
| |
| Current prototype uses queue sequence numbers to identify |
| message. This is tricky for updating new members as the sequence |
| numbers are only known on delivery. |
| |
| Independent message IDs that can be generated and sent with the message simplify |
| this and potentially allow performance benefits by relaxing total ordering. |
| However they imply additional map lookups that might hurt performance. |
| |
| - [X] Prototype independent message IDs, check performance. |
| Throughput worse by 30% in contented case, 10% in uncontended. |
| Sticking with queue sequence numbers. |
| |
| * Outstanding Tasks |
| ** TODO [#A] Defer and async completion of wiring commands. |
| |
| Testing requirement: Many tests assume wiring changes are visible |
| across the cluster once the commad completes. |
| |
| Name clashes: need to avoid race if same name queue/exchange declared |
| on 2 brokers simultaneously |
| |
| ** TODO [#A] Passing all existing cluster tests. |
| |
| The new cluster should be a drop-in replacement for the old, so it |
| should be able to pass all the existing tests. |
| |
| ** TODO [#A] Update to new members joining. |
| |
| Need to resolve [[Queue sequence numbers vs. independant message IDs]] first. |
| - implicit sequence numbers are more tricky to replicate to new member. |
| |
| Update individual objects (queues and exchanges) independently. |
| - create queues first, then update all queues and exchanges in parallel. |
| - multiple updater threads, per queue/exchange. |
| - updater sends messages to special exchange(s) (not using extended AMQP controls) |
| |
| Queue updater: |
| - marks the queue position at the sync point |
| - sends messages starting from the sync point working towards the head of the queue. |
| - send "done" message. |
| Note: updater remains active throughout, consuming clients actually reduce the |
| size of the update. |
| |
| Queue updatee: |
| - enqueues received from CPG: add to back of queue as normal. |
| - dequeues received from CPG: apply if found, else save to check at end of update. |
| - messages from updater: add to the *front* of the queue. |
| - update complete: apply any saved dequeues. |
| |
| Exchange updater: |
| - updater: send snapshot of exchange as it was at the sync point. |
| |
| Exchange updatee: |
| - queue exchange operations after the sync point. |
| - when snapshot is received: apply saved operations. |
| |
| Updater remains active throughout. |
| Updatee stalls clients until the update completes. |
| |
| Updating queue/exchange/binding objects is via the same encode/decode |
| that is used by the store. Updatee to use recovery interfaces to |
| recover? |
| |
| ** TODO [#A] Failover updates to client. |
| Implement the amq.failover exchange to notify clients of membership. |
| |
| ** TODO [#B] Initial status protocol. |
| Handshake to give status of each broker member to new members joining. |
| Status includes |
| - persistent store state (clean, dirty) |
| - cluster protocol version. |
| |
| ** TODO [#B] Replace boost::hash with our own hash function. |
| The hash function is effectively part of the interface so |
| we need to be sure it doesn't change underneath us. |
| |
| ** TODO [#B] Persistent cluster support. |
| Initial status protoocl to support persistent start-up (see existing code) |
| |
| Only one broker recovers from store, update to others. |
| |
| Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. |
| |
| ** TODO [#B] Management support |
| Replicate management methods that modify queues - e.g. move, purge. |
| Target broker may not have all messages on other brokers for purge/destroy. |
| - Queue::move() - need to wait for lock? Replicate? |
| - Queue::get() - ??? |
| - Queue::purge() - replicate purge? or just delete what's on broker ? |
| - Queue::destroy() - messages to alternate exchange on all brokers.? |
| |
| Need to add callpoints & mcast messages to replicate these? |
| |
| ** TODO [#B] TX transaction support. |
| Extend broker::Cluster interface to capture transaction context and completion. |
| Running brokers exchange TX information. |
| New broker update includes TX information. |
| |
| // FIXME aconway 2010-10-18: As things stand the cluster is not |
| // compatible with transactions |
| // - enqueues occur after routing is complete |
| // - no call to Cluster::enqueue, should be in Queue::process? |
| // - no transaction context associated with messages in the Cluster interface. |
| // - no call to Cluster::accept in Queue::dequeueCommitted |
| |
| ** TODO [#B] DTX transaction support. |
| Extend broker::Cluster interface to capture transaction context and completion. |
| Running brokers exchange DTX information. |
| New broker update includes DTX information. |
| |
| ** TODO [#B] Async completion of accept. |
| When this is fixed in the standalone broker, it should be fixed for cluster. |
| |
| ** TODO [#B] Network partitions and quorum. |
| Re-use existing implementation. |
| |
| ** TODO [#B] Review error handling, put in a consitent model. |
| - [ ] Review all asserts, for possible throw. |
| - [ ] Decide on fatal vs. non-fatal errors. |
| |
| ** TODO [#B] Implement inconsistent error handling policy. |
| What to do if a message is enqueued sucessfully on the local broker, |
| but fails on one or more backups - e.g. due to store limits? |
| - we have more flexibility, we don't *have* to crash |
| - but we've loste some of our redundancy guarantee, how should we inform client? |
| |
| ** TODO [#C] Allow non-replicated exchanges, queues. |
| |
| Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects. |
| - save replicated status to store. |
| - support in management tools. |
| Replicated queue: replicate all messages. |
| Replicated exchange: replicate bindings to replicated queues only. |
| |
| Configurable default? Defaults to true. |
| |
| ** TODO [#C] Refactoring of common concerns. |
| |
| There are a bunch of things that act as "Queue observers" with intercept |
| points in similar places. |
| - QueuePolicy |
| - QueuedEvents (async replication) |
| - MessageStore |
| - Cluster |
| |
| Look for ways to capitalize on the similarity & simplify the code. |
| |
| In particular QueuedEvents (async replication) strongly resembles |
| cluster replication, but over TCP rather than multicast. |
| |
| ** TODO [#C] Handling immediate messages in a cluster |
| Include remote consumers in descision to deliver an immediate message? |
| ** TODO [#C] Remove old cluster hacks and workarounds |
| The old cluster has workarounds in the broker code that can be removed. |
| - [ ] drop code to replicate management model. |
| - [ ] drop timer workarounds for TTL, management, heartbeats. |
| - [ ] drop "cluster-safe assertions" in broker code. |
| - [ ] drop connections, sessions, management from cluster update. |
| - [ ] drop security workarounds: cluster code now operates after message decoding. |
| - [ ] drop connection tracking in cluster code. |
| - [ ] simpler inconsistent-error handling code, no need to stall. |
| |
| ** TODO [#C] Support for live upgrades. |
| |
| Allow brokers in a running cluster to be replaced one-by-one with a new version. |
| (see new-cluster-design for design notes.) |
| |
| The old cluster protocol was unstable because any changes in broker |
| state caused changes to the cluster protocol.The new design should be |
| much more stable. |
| |
| Points to implement in anticipation of live upgrade: |
| - Prefix each CPG message with a version number and length. |
| Version number determines how to decode the message. |
| - Brokers ignore messages that have a higher version number than they understand. |
| - Protocol version XML element in cluster.xml, on each control. |
| - Initial status protocol to include protocol version number. |
| |
| New member udpates: use the store encode/decode for updates, use the |
| same backward compatibility strategy as the store. This allows for |
| adding new elements to the end of structures but not changing or |
| removing new elements. |
| |
| ** TODO [#C] Support for AMQP 1.0. |
| |
| * Testing |
| ** TODO [#A] Pass all existing cluster tests. |
| Requires [[Defer and async completion of wiring commands.]] |
| ** TODO [#A] New cluster tests. |
| Stress tests & performance benchmarks focused on changes in new cluster: |
| - concurrency by queues rather than connections. |
| - different handling shared queues when consuemrs are on different brokers. |