| -*-org-*- |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| Notes on new cluster implementation. See also: new-cluster-design.txt |
| |
| * Implementation plan. |
| |
| Co-existence with old cluster code and tests: |
| - Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. |
| - Double up tests with old version/new version as the new code develops. |
| |
| Minimal POC for message delivery & perf test. |
| - no wiring replication, no updates, no failover, no persistence, no async completion. |
| - just implement publish and acquire/dequeue locking protocol. |
| - optimize the special case where all consumers are on the same node. |
| - measure performance: compare active-passive and active-active modes of use. |
| |
| Full implementation of transient cluster |
| - Update (based on existing update), async completion etc. |
| - Passing all existing transient cluster tests. |
| |
| Persistent cluster |
| - Make sure async completion works correctly. |
| - InitialStatus protoocl etc. to support persistent start-up (existing code) |
| - cluster restart from store: stores not identical. Load one, update the rest. |
| - assign cluster ID's to messages recovered from store, don't replicate. |
| |
| Improved update protocol |
| - per-queue, less stalling, bounded catch-up. |
| |
| * Task list |
| |
| ** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. |
| |
| NOTE: as implementation questions arise, take the easiest option and make |
| a note for later optimization/improvement. |
| |
| *** Tests |
| - python test: 4 senders, numbered messages, 4 receivers, verify message set. |
| - acquire then release messages: verify can be dequeued on any member |
| - acquire then kill broker: verify can be dequeued other members. |
| - acquire then reject: verify goes on alt-exchange once only. |
| |
| *** DONE broker::Cluster interface and call points. |
| |
| Initial interface commited. |
| |
| *** Main classes |
| |
| BrokerHandler: |
| - implements broker::Cluster intercept points. |
| - sends mcast events to inform cluster of local actions. |
| - thread safe, called in connection threads. |
| |
| LocalMessageMap: |
| - Holds local messages while they are being enqueued. |
| - thread safe: called by both BrokerHandler and MessageHandler |
| |
| MessageHandler: |
| - handles delivered mcast messages related to messages. |
| - initiates local actions in response to mcast events. |
| - thread unsafe, only called in deliver thread. |
| - maintains view of cluster state regarding messages. |
| |
| QueueOwnerHandler: |
| - handles delivered mcast messages related to queue consumer ownership. |
| - thread safe, called in deliver, connection and timer threads. |
| - maintains view of cluster state regarding queue ownership. |
| |
| cluster::Core: class to hold new cluster together (replaces cluster::Cluster) |
| - thread safe: manage state used by both MessageHandler and BrokerHandler |
| |
| The following code sketch illustrates only the "happy path" error handling |
| is omitted. |
| |
| *** BrokerHandler |
| Types: |
| - struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } |
| - struct |
| |
| NOTE: |
| - Messages on queues are identified by a queue name + a position. |
| - Messages being routed are identified by a sequence number. |
| |
| Members: |
| - thread_local bool noReplicate // suppress replication. |
| - thread_local bool isRouting // suppress operations while routing |
| - Message localMessage[SequenceNumber] // local messages being routed. |
| - thread_local SequenceNumber routingSequence |
| |
| NOTE: localMessage is also modified by MessageHandler. |
| |
| broker::Cluster intercept functions: |
| |
| routing(msg) |
| if noReplicate: return |
| # Supress everything except enqueues while we are routing. |
| # We don't want to replicate acquires & dequeues caused by an enqueu, |
| # e.g. removal of messages from ring/LV queues. |
| isRouting = true |
| |
| enqueue(qmsg): |
| if noReplicate: return |
| if routingSequence == 0 # thread local |
| routingSequence = nextRoutingSequence() |
| mcast create(encode(qmsg.msg),routingSeq) |
| mcast enqueue(qmsg.q,routingSeq) |
| |
| routed(msg): |
| if noReplicate: return |
| isRouting = false |
| |
| acquire(qmsg): |
| if noReplicate: return |
| if isRouting: return # Ignore while we are routing a message. |
| if msg.id: mcast acquire(qmsg) |
| |
| release(QueuedMessage) |
| if noReplicate: return |
| if isRouting: return # Ignore while we are routing a message. |
| mcast release(qmsg) |
| |
| accept(QueuedMessage): |
| if noReplicate: return |
| if isRouting: return # Ignore while we are routing a message. |
| mcast accept(qmsg) |
| |
| reject(QueuedMessage): |
| isRejecting = true |
| mcast reject(qmsg) |
| |
| # FIXME no longer needed? |
| drop(QueuedMessage) |
| cleanup(qmsg) |
| |
| *** MessageHandler and mcast messages |
| Types: |
| - struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } |
| - struct QueueKey { MessageId id; QueueName q; } |
| - typedef map<QueueKey, QueueEntry> Queue |
| - struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; } |
| |
| Members: |
| - QueueEntry enqueued[QueueKey] |
| - Node node[NodeId] |
| |
| Mcast messages in Message class: |
| |
| create(msg,seq) |
| if sender != self: node[sender].routing[seq] = decode(msg) |
| |
| enqueue(q,seq): |
| id = (sender,seq) |
| if sender == self: |
| enqueued[id,q] = (localMessage[seq], acquired=None) |
| else: |
| msg = sender.routing[seq] |
| enqueued[id,q] = (qmsg, acquired=None) |
| with noReplicate=true: qmsg = broker.getQueue(q).push(msg) |
| |
| routed(seq): |
| if sender == self: localMessage.erase(msg.id.seq) |
| else: sender.routing.erase(seq) |
| |
| acquire(id,q): |
| enqueued[id,q].acquired = sender |
| node[sender].acquired.push_back((id,q)) |
| if sender != self: |
| with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) |
| |
| release(id,q) |
| enqueued[id,q].acquired = None |
| node[sender].acquired.erase((id,q)) |
| if sender != self |
| with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) |
| |
| reject(id,q): |
| sender.routing[id] = enqueued[id,q] # prepare for re-queueing |
| |
| rejected(id,q) |
| sender.routing.erase[id] |
| |
| dequeue(id,q) |
| entry = enqueued[id,q] |
| enqueued.erase[id,q] |
| node[entry.acquired].acquired.erase(id,q) |
| if sender != self: |
| with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) |
| |
| member m leaves cluster: |
| for key in node[m].acquired: |
| release(key.id, key.q) |
| node.erase(m) |
| |
| *** Queue consumer locking |
| |
| When a queue is locked it does not deliver messages to its consumers. |
| |
| New broker::Queue functions: |
| - stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. |
| - startConsumers(): reset consumersStopped flag |
| |
| Implementation sketch, locking omitted: |
| |
| void Queue::stopConsumers() { |
| consumersStopped = true; |
| while (consumersBusy) consumersBusyMonitor.wait(); |
| } |
| |
| void Queue::startConsumers() { |
| consumersStopped = false; |
| listeners.notify(); |
| } |
| |
| bool Queue::dispatch(consumer) { |
| if (consumersStopped) return false; |
| ++consumersBusy; |
| do_regular_dispatch_body() |
| if (--consumersBusy == 0) consumersBusyMonitor.notify(); |
| } |
| |
| *** QueueOwnerHandler |
| |
| Invariants: |
| - Each queue is owned by at most one node at any time. |
| - Each node is interested in a set of queues at any given time. |
| - A queue is un-owned if no node is interested. |
| |
| The queue owner releases the queue when |
| - it loses interest i.e. queue has no consumers with credit. |
| - a configured time delay expires and there are other interested nodes. |
| |
| The owner mcasts release(q). On delivery the new queue owner is the |
| next node in node-id order (treating nodes as a circular list) |
| starting from the old owner that is interested in the queue. |
| |
| Queue consumers initially are stopped, only started when we get |
| ownership from the cluster. |
| |
| Thread safety: called by deliver, connection and timer threads, needs locking. |
| |
| Thread safe object per queue holding queue ownership status. |
| Called by deliver, connection and timer threads. |
| |
| class QueueOwnership { |
| bool owned; |
| Timer timer; |
| BrokerQueue q; |
| |
| drop(): # locked |
| if owned: |
| owned = false |
| q.stopConsumers() |
| mcast release(q.name, false) |
| timer.stop() |
| |
| take(): # locked |
| if not owned: |
| owned = true |
| q.startConsumers() |
| timer.start(timeout) |
| |
| timer.fire(): drop() |
| } |
| |
| Data Members, only modified/examined in deliver thread: |
| - typedef set<NodeId> ConsumerSet |
| - map<QueueName, ConsumerSet> consumers |
| - map<QueueName, NodeId> owner |
| |
| Thread safe data members, accessed in connection threads (via BrokerHandler): |
| - map<QueueName, QueueOwnership> ownership |
| |
| Multicast messages in QueueOwner class: |
| |
| consume(q): |
| if sender==self and consumers[q].empty(): ownership[q].take() |
| consumers[q].insert(sender) |
| |
| release(q): |
| asssert(owner[q] == sender and owner[q] in consumers[q]) |
| owner[q] = circular search from sender in consumers[q] |
| if owner==self: ownership[q].take() |
| |
| cancel(q): |
| assert(queue[q].owner != sender) # sender must release() before cancel() |
| consumers[q].erase(sender) |
| |
| member-leaves: |
| for q in queue: if owner[q] = left: left.release(q) |
| |
| Need 2 more intercept points in broker::Cluster: |
| |
| consume(q,consumer,consumerCount) - Queue::consume() |
| if consumerCount == 1: mcast consume(q) |
| |
| cancel(q,consumer,consumerCount) - Queue::cancel() |
| if consumerCount == 0: |
| ownership[q].drop() |
| mcast cancel(q) |
| |
| #TODO: lifecycle, updating cluster data structures when queues are destroyed |
| |
| *** Increasing concurrency |
| The major performance limitation of the old cluster is that it does |
| everything in the single CPG deliver thread context. |
| |
| We can get additional concurrency by creating a thread context _per queue_ |
| for queue operations: enqueue, acquire, accept etc. |
| |
| We associate a PollableQueue of queue operations with each AMQP queue. |
| The CPG deliver thread would |
| - build messages and associate with cluster IDs. |
| - push queue ops to the appropriate PollableQueue to be dispatched the queues thread. |
| |
| Serializing operations on the same queue avoids contention, but takes advantage |
| of the independence of operations on separate queues. |
| |
| *** Re-use of existing cluster code |
| - re-use Event |
| - re-use Multicaster |
| - re-use same PollableQueueSetup (may experiment later) |
| - new Core class to replace Cluster. |
| - keep design modular, keep threading rules clear. |
| |
| ** TODO [#B] Large message replication. |
| Multicast should encode messages in fixed size buffers (64k)? |
| Can't assume we can send message in one chunk. |
| For 0-10 can use channel numbers & send whole frames packed into larger buffer. |
| ** TODO [#B] Transaction support. |
| Extend broker::Cluster interface to capture transaction context and completion. |
| Sequence number to generate per-node tx IDs. |
| Replicate transaction completion. |
| ** TODO [#B] Batch CPG multicast messages |
| The new cluster design involves a lot of small multicast messages, |
| they need to be batched into larger CPG messages for efficiency. |
| ** TODO [#B] Genuine async completion |
| Replace current synchronous waiting implementation with genuine async completion. |
| |
| Test: enhance test_store.cpp to defer enqueueComplete till special message received. |
| |
| Async callback uses *requestIOProcessing* to queue action on IO thread. |
| |
| ** TODO [#B] Async completion of accept when dequeue completes. |
| Interface is already there on broker::Message, just need to ensure |
| that store and cluster implementations call it appropriately. |
| |
| ** TODO [#B] Replicate wiring. |
| From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. |
| |
| ** TODO [#B] New members joining - first pass |
| |
| Re-use update code from old cluster but don't replicate sessions & |
| connections. |
| |
| Need to extend it to send cluster IDs with messages. |
| |
| Need to replicate the queue ownership data as part of the update. |
| |
| ** TODO [#B] Persistence support. |
| InitialStatus protoocl etc. to support persistent start-up (existing code) |
| |
| Only one broker recovers from store, update to others. |
| |
| Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. |
| |
| ** TODO [#B] Handle other ways that messages can leave a queue. |
| |
| Other ways (other than via a consumer) that messages are take off a queue. |
| |
| NOTE: Not controlled by queue lock, how to make them consistent? |
| |
| Target broker may not have all messages on other brokers for purge/destroy. |
| - Queue::move() - need to wait for lock? Replicate? |
| - Queue::get() - ??? |
| - Queue::purge() - replicate purge? or just delete what's on broker ? |
| - Queue::destroy() - messages to alternate exchange on all brokers.? |
| |
| Need to add callpoints & mcast messages to replicate these? |
| |
| ** TODO [#B] Flow control for internal queues. |
| |
| Need to bound the size of internal queues: delivery and multicast. |
| - stop polling for read on client connections when we reach a bound. |
| - restart polling when we get back under it. |
| |
| That will stop local multicasting, we still have to deal with remote |
| multicasting (note existing cluster does not do this.) Something like: |
| - when over bounds multicast a flow-control event. |
| - on delivery of flow-control all members stop polling to read client connections |
| - when back under bounds send flow-control-end, all members resume |
| - if flow-controling member dies others resume |
| |
| ** TODO [#B] Integration with transactions. |
| Do we want to replicate during transaction & replicate commit/rollback |
| or replicate only on commit? |
| No integration with DTX transactions. |
| ** TODO [#B] Make new cluster work with replication exchange. |
| Possibly re-use some common logic. Replication exchange is like clustering |
| except over TCP. |
| ** TODO [#B] Better concurrency, scalabiility on multi-cores. |
| Introduce PollableQueue of operations per broker queue. Queue up mcast |
| operations (enqueue, acquire, accept etc.) to be handled concurrently |
| on different queue. Performance testing to verify improved scalability. |
| ** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. |
| Cluster needs to complete these asynchronously to guarantee resources |
| exist across the cluster when the command completes. |
| |
| ** TODO [#C] Allow non-replicated exchanges, queues. |
| |
| Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. |
| - save replicated status to store. |
| - support in management tools. |
| Replicated exchange: replicate binds to replicated queues. |
| Replicated queue: replicate all messages. |
| |
| ** TODO [#C] New members joining - improved. |
| |
| Replicate wiring like old cluster, stall for wiring but not for |
| messages. Update messages on a per-queue basis from back to front. |
| |
| Updater: |
| - stall & push wiring: declare exchanges, queues, bindings. |
| - start update iterator thread on each queue. |
| - unstall and process normally while iterator threads run. |
| |
| Update iterator thread: |
| - starts at back of updater queue, message m. |
| - send update_front(q,m) to updatee and advance towards front |
| - at front: send update_done(q) |
| |
| Updatee: |
| - stall, receive wiring, lock all queues, mark queues "updating", unstall |
| - update_front(q,m): push m to *front* of q |
| - update_done(q): mark queue "ready" |
| |
| Updatee cannot take the queue consume lock for a queue that is updating. |
| Updatee *can* push messages onto a queue that is updating. |
| |
| TODO: Is there any way to eliminate the stall for wiring? |
| |
| ** TODO [#C] Refactoring of common concerns. |
| |
| There are a bunch of things that act as "Queue observers" with intercept |
| points in similar places. |
| - QueuePolicy |
| - QueuedEvents (async replication) |
| - MessageStore |
| - Cluster |
| |
| Look for ways to capitalize on the similarity & simplify the code. |
| |
| In particular QueuedEvents (async replication) strongly resembles |
| cluster replication, but over TCP rather than multicast. |
| ** TODO [#C] Concurrency for enqueue events. |
| All enqueue events are being processed in the CPG deliver thread context which |
| serializes all the work. We only need ordering on a per queue basis, can we |
| enqueue in parallel on different queues and will that improve performance? |
| ** TODO [#C] Handling immediate messages in a cluster |
| Include remote consumers in descision to deliver an immediate message? |