| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "qpid/broker/RecoveryManagerImpl.h" |
| |
| #include "qpid/broker/Message.h" |
| #include "qpid/broker/Queue.h" |
| #include "qpid/broker/Link.h" |
| #include "qpid/broker/Bridge.h" |
| #include "qpid/broker/RecoveredEnqueue.h" |
| #include "qpid/broker/RecoveredDequeue.h" |
| #include "qpid/framing/reply_exceptions.h" |
| |
| using boost::dynamic_pointer_cast; |
| using boost::intrusive_ptr; |
| using std::string; |
| |
| namespace qpid { |
| namespace broker { |
| |
| RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, |
| DtxManager& _dtxMgr) |
| : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {} |
| |
| RecoveryManagerImpl::~RecoveryManagerImpl() {} |
| |
| class RecoverableMessageImpl : public RecoverableMessage |
| { |
| intrusive_ptr<Message> msg; |
| public: |
| RecoverableMessageImpl(const intrusive_ptr<Message>& _msg); |
| ~RecoverableMessageImpl() {}; |
| void setPersistenceId(uint64_t id); |
| void setRedelivered(); |
| bool loadContent(uint64_t available); |
| void decodeContent(framing::Buffer& buffer); |
| void recover(Queue::shared_ptr queue); |
| void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); |
| void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); |
| }; |
| |
| class RecoverableQueueImpl : public RecoverableQueue |
| { |
| Queue::shared_ptr queue; |
| public: |
| RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} |
| ~RecoverableQueueImpl() {}; |
| void setPersistenceId(uint64_t id); |
| uint64_t getPersistenceId() const; |
| const std::string& getName() const; |
| void setExternalQueueStore(ExternalQueueStore* inst); |
| ExternalQueueStore* getExternalQueueStore() const; |
| void recover(RecoverableMessage::shared_ptr msg); |
| void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); |
| void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); |
| }; |
| |
| class RecoverableExchangeImpl : public RecoverableExchange |
| { |
| Exchange::shared_ptr exchange; |
| QueueRegistry& queues; |
| public: |
| RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} |
| void setPersistenceId(uint64_t id); |
| void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); |
| }; |
| |
| class RecoverableConfigImpl : public RecoverableConfig |
| { |
| Link::shared_ptr link; |
| Bridge::shared_ptr bridge; |
| public: |
| RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} |
| RecoverableConfigImpl(Bridge::shared_ptr _bridge) : bridge(_bridge) {} |
| void setPersistenceId(uint64_t id); |
| }; |
| |
| class RecoverableTransactionImpl : public RecoverableTransaction |
| { |
| DtxBuffer::shared_ptr buffer; |
| public: |
| RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {} |
| void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); |
| void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); |
| }; |
| |
| RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer) |
| { |
| Exchange::shared_ptr e = Exchange::decode(exchanges, buffer); |
| if (e) { |
| return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(e, queues)); |
| } else { |
| return RecoverableExchange::shared_ptr(); |
| } |
| } |
| |
| RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer) |
| { |
| Queue::shared_ptr queue = Queue::restore(queues, buffer); |
| try { |
| Exchange::shared_ptr exchange = exchanges.getDefault(); |
| if (exchange) { |
| exchange->bind(queue, queue->getName(), 0); |
| queue->bound(exchange->getName(), queue->getName(), framing::FieldTable()); |
| } |
| } catch (const framing::NotFoundException& /*e*/) { |
| //assume no default exchange has been declared |
| } |
| return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue)); |
| } |
| |
| RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) |
| { |
| boost::intrusive_ptr<Message> message(new Message()); |
| message->decodeHeader(buffer); |
| return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message)); |
| } |
| |
| RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, |
| std::auto_ptr<TPCTransactionContext> txn) |
| { |
| DtxBuffer::shared_ptr buffer(new DtxBuffer()); |
| dtxMgr.recover(xid, txn, buffer); |
| return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); |
| } |
| |
| RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) |
| { |
| string kind; |
| |
| buffer.getShortString (kind); |
| if (kind == "link") |
| return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); |
| else if (kind == "bridge") |
| return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); |
| |
| return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead |
| } |
| |
| void RecoveryManagerImpl::recoveryComplete() |
| { |
| //notify all queues and exchanges |
| queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1, boost::ref(exchanges))); |
| exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); |
| } |
| |
| RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg) |
| { |
| if (!msg->isPersistent()) { |
| msg->forcePersistent(); // set so that message will get dequeued from store. |
| } |
| } |
| |
| bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) |
| { |
| return true; |
| } |
| |
| void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer) |
| { |
| msg->decodeContent(buffer); |
| } |
| |
| void RecoverableMessageImpl::recover(Queue::shared_ptr queue) |
| { |
| queue->recover(msg); |
| } |
| |
| void RecoverableMessageImpl::setPersistenceId(uint64_t id) |
| { |
| msg->setPersistenceId(id); |
| } |
| |
| void RecoverableMessageImpl::setRedelivered() |
| { |
| msg->redeliver(); |
| } |
| |
| void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) |
| { |
| dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue); |
| } |
| |
| void RecoverableQueueImpl::setPersistenceId(uint64_t id) |
| { |
| queue->setPersistenceId(id); |
| } |
| |
| uint64_t RecoverableQueueImpl::getPersistenceId() const |
| { |
| return queue->getPersistenceId(); |
| } |
| |
| const std::string& RecoverableQueueImpl::getName() const |
| { |
| return queue->getName(); |
| } |
| |
| void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) |
| { |
| queue->setExternalQueueStore(inst); |
| } |
| |
| ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const |
| { |
| return queue->getExternalQueueStore(); |
| } |
| |
| void RecoverableExchangeImpl::setPersistenceId(uint64_t id) |
| { |
| exchange->setPersistenceId(id); |
| } |
| |
| void RecoverableConfigImpl::setPersistenceId(uint64_t id) |
| { |
| if (link.get()) |
| link->setPersistenceId(id); |
| else if (bridge.get()) |
| bridge->setPersistenceId(id); |
| } |
| |
| void RecoverableExchangeImpl::bind(const string& queueName, |
| const string& key, |
| framing::FieldTable& args) |
| { |
| Queue::shared_ptr queue = queues.find(queueName); |
| exchange->bind(queue, key, &args); |
| queue->bound(exchange->getName(), key, args); |
| } |
| |
| void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) |
| { |
| buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg))); |
| } |
| |
| void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) |
| { |
| buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); |
| } |
| |
| void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) |
| { |
| dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue); |
| } |
| |
| void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) |
| { |
| dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue); |
| } |
| |
| void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) |
| { |
| dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message); |
| } |
| |
| void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) |
| { |
| dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message); |
| } |
| |
| }} |