| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "unit_test.h" |
| |
| #include "qpid/legacystore/MessageStoreImpl.h" |
| #include <iostream> |
| #include "MessageUtils.h" |
| #include "qpid/broker/Queue.h" |
| #include "qpid/broker/RecoveryManagerImpl.h" |
| #include "qpid/framing/AMQHeaderBody.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/legacystore/TxnCtxt.h" |
| #include "qpid/log/Logger.h" |
| #include "qpid/sys/Timer.h" |
| |
| using namespace mrg::msgstore; |
| using namespace qpid; |
| using namespace qpid::broker; |
| using namespace qpid::framing; |
| using namespace std; |
| |
| |
| qpid::broker::Broker::Options opts; |
| qpid::broker::Broker br(opts); |
| |
| |
| QPID_AUTO_TEST_SUITE(TwoPhaseCommitTest) |
| |
| #define SET_LOG_LEVEL(level) \ |
| qpid::log::Options opts(""); \ |
| opts.selectors.clear(); \ |
| opts.selectors.push_back(level); \ |
| qpid::log::Logger::instance().configure(opts); |
| |
| |
| const string test_filename("TwoPhaseCommitTest"); |
| const char* tdp = getenv("TMP_DATA_DIR"); |
| string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TwoPhaseCommitTest"); |
| |
| // === Helper fns === |
| |
| class TwoPhaseCommitTest |
| { |
| |
| class Strategy |
| { |
| public: |
| virtual void init() = 0; |
| virtual void run(TPCTransactionContext* txn) = 0; |
| virtual void check(bool committed) = 0; |
| virtual ~Strategy(){} |
| }; |
| |
| class Swap : public Strategy |
| { |
| TwoPhaseCommitTest* const test; |
| const string messageId; |
| Message msg; |
| public: |
| Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {} |
| void init(){ msg = test->deliver(messageId, test->queueA); } |
| void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); } |
| void check(bool committed) { test->swapCheck(committed, messageId, test->queueA, test->queueB); } |
| }; |
| |
| class Enqueue : public Strategy |
| { |
| TwoPhaseCommitTest* const test; |
| Message msg1; |
| Message msg2; |
| Message msg3; |
| public: |
| Enqueue(TwoPhaseCommitTest* const test_): test(test_) {} |
| void init() {} |
| void run(TPCTransactionContext* txn) { |
| msg1 = test->enqueue(txn, "Enqueue1", test->queueA); |
| msg2 = test->enqueue(txn, "Enqueue2", test->queueA); |
| msg3 = test->enqueue(txn, "Enqueue3", test->queueA); |
| } |
| void check(bool committed) { |
| if (committed) { |
| test->checkMsg(test->queueA, 3, "Enqueue1"); |
| test->checkMsg(test->queueA, 2, "Enqueue2"); |
| test->checkMsg(test->queueA, 1, "Enqueue3"); |
| } |
| test->checkMsg(test->queueA, 0); |
| } |
| }; |
| |
| class Dequeue : public Strategy |
| { |
| TwoPhaseCommitTest* const test; |
| Message msg1; |
| Message msg2; |
| Message msg3; |
| public: |
| Dequeue(TwoPhaseCommitTest* const test_): test(test_) {} |
| void init() { |
| msg1 = test->deliver("Dequeue1", test->queueA); |
| msg2 = test->deliver("Dequeue2", test->queueA); |
| msg3 = test->deliver("Dequeue3", test->queueA); |
| } |
| void run(TPCTransactionContext* txn) { |
| test->dequeue(txn, test->queueA); |
| test->dequeue(txn, test->queueA); |
| test->dequeue(txn, test->queueA); |
| } |
| void check(bool committed) { |
| if (!committed) { |
| test->checkMsg(test->queueA, 3, "Dequeue1"); |
| test->checkMsg(test->queueA, 2, "Dequeue2"); |
| test->checkMsg(test->queueA, 1, "Dequeue3"); |
| } |
| test->checkMsg(test->queueA, 0); |
| } |
| }; |
| |
| class MultiQueueTxn : public Strategy |
| { |
| TwoPhaseCommitTest* const test; |
| Message msg1; |
| Message msg2; |
| std::set<Queue::shared_ptr> queueset; |
| public: |
| MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {} |
| virtual void init() {} |
| virtual void run(TPCTransactionContext* txn) { |
| queueset.insert(test->queueA); |
| queueset.insert(test->queueB); |
| msg1 = test->enqueue(txn, "Message1", queueset); |
| msg2 = test->enqueue(txn, "Message2", queueset); |
| queueset.clear(); |
| } |
| virtual void check(bool committed) { |
| TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get()); |
| if (committed) |
| { |
| test->checkMsg(test->queueA, 2, "Message1"); |
| test->checkMsg(test->queueB, 2, "Message1"); |
| test->checkMsg(test->queueA, 1, "Message2"); |
| test->checkMsg(test->queueB, 1, "Message2"); |
| } |
| test->checkMsg(test->queueA, 0); |
| test->checkMsg(test->queueB, 0); |
| // Check there are no remaining open txns in store |
| BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA))); |
| BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB))); |
| BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns()); |
| } |
| }; |
| |
| // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning |
| class TestTPCTxnCtxt : public TPCTxnCtxt |
| { |
| public: |
| TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {} |
| void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) { |
| // Remove queue members from back of impactedQueues until queues_rem reamin. |
| // to end to simulate multi-queue txn complete failure. |
| while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin()); |
| // If prepared list is not to be committed, set pointer to 0 |
| if (!complete_prepared_list) preparedXidStorePtr = 0; |
| } |
| }; |
| |
| // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for |
| // reamining open transactions |
| class TestMessageStore: public MessageStoreImpl |
| { |
| public: |
| TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {} |
| std::auto_ptr<qpid::broker::TPCTransactionContext> TMSbegin(const std::string& xid) { |
| checkInit(); |
| IdSequence* jtx = &messageIdSequence; |
| // pass sequence number for c/a |
| return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx)); |
| } |
| u_int32_t getRemainingTxns(const PersistableQueue& queue) { |
| return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt(); |
| } |
| u_int32_t getRemainingPreparedListTxns() { |
| return tplStorePtr->get_open_txn_cnt(); |
| } |
| }; |
| |
| const string nameA; |
| const string nameB; |
| std::auto_ptr<MessageStoreImpl> store; |
| std::auto_ptr<DtxManager> dtxmgr; |
| std::auto_ptr<QueueRegistry> queues; |
| std::auto_ptr<LinkRegistry> links; |
| Queue::shared_ptr queueA; |
| Queue::shared_ptr queueB; |
| Message msg1; |
| Message msg2; |
| Message msg4; |
| std::auto_ptr<TxBuffer> tx; |
| |
| void recoverPrepared(bool commit) |
| { |
| setup<MessageStoreImpl>(); |
| |
| Swap swap(this, "RecoverPrepared"); |
| swap.init(); |
| std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid")); |
| swap.run(txn.get()); |
| if (tx.get()) { |
| tx->prepare(txn.get()); |
| tx.reset(); |
| } |
| |
| store->prepare(*txn); |
| restart<MessageStoreImpl>(); |
| |
| //check that the message is not available from either queue |
| BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount()); |
| BOOST_CHECK_EQUAL((u_int32_t) 0, queueB->getMessageCount()); |
| |
| //commit/abort the txn - through the dtx manager, not directly on the store |
| if (commit) { |
| dtxmgr->commit("my-xid", false); |
| } else { |
| dtxmgr->rollback("my-xid"); |
| } |
| |
| swap.check(commit); |
| restart<MessageStoreImpl>(); |
| swap.check(commit); |
| } |
| |
| void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit) |
| { |
| setup<TestMessageStore>(); |
| MultiQueueTxn mqtTest(this); |
| mqtTest.init(); |
| std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid")); |
| mqtTest.run(txn.get()); |
| if (tx.get()) { |
| tx->prepare(txn.get()); |
| tx.reset(); |
| } |
| store->prepare(*txn); |
| |
| // As the commits and aborts should happen through DtxManager, and it is too complex to |
| // pass all these test params through, we bypass DtxManager and use the store directly. |
| // This will prevent the queues from seeing committed txns, however. To test the success |
| // or failure of |
| static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list); |
| if (commit) |
| store->commit(*txn); |
| else |
| store->abort(*txn); |
| restart<TestMessageStore>(); |
| mqtTest.check(commit); |
| } |
| |
| void commit(Strategy& strategy) |
| { |
| setup<MessageStoreImpl>(); |
| strategy.init(); |
| |
| std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid")); |
| strategy.run(txn.get()); |
| if (tx.get()) { |
| tx->prepare(txn.get()); |
| tx.reset(); |
| } |
| store->prepare(*txn); |
| store->commit(*txn); |
| restart<MessageStoreImpl>(); |
| strategy.check(true); |
| } |
| |
| void abort(Strategy& strategy, bool prepare) |
| { |
| setup<MessageStoreImpl>(); |
| strategy.init(); |
| |
| std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid")); |
| strategy.run(txn.get()); |
| if (tx.get()) { |
| tx->prepare(txn.get()); |
| tx.reset(); |
| } |
| if (prepare) store->prepare(*txn); |
| store->abort(*txn); |
| restart<MessageStoreImpl>(); |
| strategy.check(false); |
| } |
| |
| void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to) |
| { |
| QueueCursor c; |
| Message msg1 = MessageUtils::get(*from, &c);//just dequeues in memory |
| //move the message from one queue to the other as part of a |
| //distributed transaction |
| if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer); |
| to->deliver(msg1, tx.get());//note: need to enqueue it first to avoid message being deleted |
| from->dequeue(txn, c); |
| } |
| |
| void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue) |
| { |
| QueueCursor c; |
| Message msg2 = MessageUtils::get(*queue, &c);//just dequeues in memory |
| queue->dequeue(txn, c); |
| } |
| |
| Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, Queue::shared_ptr& queue) |
| { |
| Message msg = createMessage(msgid); |
| if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer); |
| queue->deliver(msg, tx.get()); |
| return msg; |
| } |
| |
| Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, std::set<Queue::shared_ptr>& queueset) |
| { |
| if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer); |
| Message msg = createMessage(msgid); |
| for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) { |
| (*i)->deliver(msg, tx.get()); |
| } |
| return msg; |
| } |
| |
| Message deliver(const string& msgid, Queue::shared_ptr& queue) |
| { |
| Message m = createMessage(msgid); |
| queue->deliver(m); |
| return m; |
| } |
| |
| template <class T> |
| void setup() |
| { |
| store = std::auto_ptr<T>(new T(&br)); |
| store->init(test_dir, 4, 1, true); // truncate store |
| |
| //create two queues: |
| queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0)); |
| queueA->create(); |
| queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0)); |
| queueB->create(); |
| } |
| |
| Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key") |
| { |
| Message msg = MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id); |
| return msg; |
| } |
| |
| template <class T> |
| void restart() |
| { |
| queueA.reset(); |
| queueB.reset(); |
| store.reset(); |
| queues.reset(); |
| links.reset(); |
| |
| store = std::auto_ptr<T>(new T(&br)); |
| store->init(test_dir, 4, 1); |
| sys::Timer t; |
| ExchangeRegistry exchanges; |
| queues = std::auto_ptr<QueueRegistry>(new QueueRegistry); |
| links = std::auto_ptr<LinkRegistry>(new LinkRegistry); |
| dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t)); |
| dtxmgr->setStore (store.get()); |
| RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, br.getProtocolRegistry()); |
| store->recover(recovery); |
| |
| queueA = queues->find(nameA); |
| queueB = queues->find(nameB); |
| } |
| |
| void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>") |
| { |
| BOOST_REQUIRE(queue); |
| BOOST_CHECK_EQUAL(size, queue->getMessageCount()); |
| if (size > 0) { |
| Message msg = MessageUtils::get(*queue); |
| BOOST_REQUIRE(msg); |
| BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg)); |
| } |
| } |
| |
| void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to) |
| { |
| BOOST_REQUIRE(from); |
| BOOST_REQUIRE(to); |
| |
| Queue::shared_ptr x; //the queue from which the message was swapped |
| Queue::shared_ptr y; //the queue on which the message is expected to be |
| |
| if (swapped) { |
| x = from; |
| y = to; |
| } else { |
| x = to; |
| y = from; |
| } |
| |
| checkMsg(x, 0); |
| checkMsg(y, 1, msgid); |
| checkMsg(y, 0); |
| } |
| |
| public: |
| TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {} |
| |
| void testCommitEnqueue() |
| { |
| Enqueue enqueue(this); |
| commit(enqueue); |
| } |
| |
| void testCommitDequeue() |
| { |
| Dequeue dequeue(this); |
| commit(dequeue); |
| } |
| |
| void testCommitSwap() |
| { |
| Swap swap(this, "SwapMessageId"); |
| commit(swap); |
| } |
| |
| void testPrepareAndAbortEnqueue() |
| { |
| Enqueue enqueue(this); |
| abort(enqueue, true); |
| } |
| |
| void testPrepareAndAbortDequeue() |
| { |
| Dequeue dequeue(this); |
| abort(dequeue, true); |
| } |
| |
| void testPrepareAndAbortSwap() |
| { |
| Swap swap(this, "SwapMessageId"); |
| abort(swap, true); |
| } |
| |
| void testAbortNoPrepareEnqueue() |
| { |
| Enqueue enqueue(this); |
| abort(enqueue, false); |
| } |
| |
| void testAbortNoPrepareDequeue() |
| { |
| Dequeue dequeue(this); |
| abort(dequeue, false); |
| } |
| |
| void testAbortNoPrepareSwap() |
| { |
| Swap swap(this, "SwapMessageId"); |
| abort(swap, false); |
| } |
| |
| void testRecoverPreparedThenCommitted() |
| { |
| recoverPrepared(true); |
| } |
| |
| void testRecoverPreparedThenAborted() |
| { |
| recoverPrepared(false); |
| } |
| |
| void testMultiQueueCommit() |
| { |
| testMultiQueueTxn(2, true, true); |
| } |
| |
| void testMultiQueueAbort() |
| { |
| testMultiQueueTxn(2, true, false); |
| } |
| |
| void testMultiQueueNoQueueCommitRecover() |
| { |
| testMultiQueueTxn(0, false, true); |
| } |
| |
| void testMultiQueueNoQueueAbortRecover() |
| { |
| testMultiQueueTxn(0, false, false); |
| } |
| |
| void testMultiQueueSomeQueueCommitRecover() |
| { |
| testMultiQueueTxn(1, false, true); |
| } |
| |
| void testMultiQueueSomeQueueAbortRecover() |
| { |
| testMultiQueueTxn(1, false, false); |
| } |
| |
| void testMultiQueueAllQueueCommitRecover() |
| { |
| testMultiQueueTxn(2, false, true); |
| } |
| |
| void testMultiQueueAllQueueAbortRecover() |
| { |
| testMultiQueueTxn(2, false, false); |
| } |
| }; |
| |
| TwoPhaseCommitTest tpct; |
| |
| // === Test suite === |
| |
| QPID_AUTO_TEST_CASE(CommitEnqueue) |
| { |
| SET_LOG_LEVEL("error+"); // This only needs to be set once. |
| |
| cout << test_filename << ".CommitEnqueue: " << flush; |
| tpct.testCommitEnqueue(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(CommitDequeue) |
| { |
| cout << test_filename << ".CommitDequeue: " << flush; |
| tpct.testCommitDequeue(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(CommitSwap) |
| { |
| cout << test_filename << ".CommitSwap: " << flush; |
| tpct.testCommitSwap(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue) |
| { |
| cout << test_filename << ".PrepareAndAbortEnqueue: " << flush; |
| tpct.testPrepareAndAbortEnqueue(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue) |
| { |
| cout << test_filename << ".PrepareAndAbortDequeue: " << flush; |
| tpct.testPrepareAndAbortDequeue(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(PrepareAndAbortSwap) |
| { |
| cout << test_filename << ".PrepareAndAbortSwap: " << flush; |
| tpct.testPrepareAndAbortSwap(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue) |
| { |
| cout << test_filename << ".AbortNoPrepareEnqueue: " << flush; |
| tpct.testAbortNoPrepareEnqueue(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue) |
| { |
| cout << test_filename << ".AbortNoPrepareDequeue: " << flush; |
| tpct.testAbortNoPrepareDequeue(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(AbortNoPrepareSwap) |
| { |
| cout << test_filename << ".AbortNoPrepareSwap: " << flush; |
| tpct.testAbortNoPrepareSwap(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted) |
| { |
| cout << test_filename << ".RecoverPreparedThenCommitted: " << flush; |
| tpct.testRecoverPreparedThenCommitted(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(RecoverPreparedThenAborted) |
| { |
| cout << test_filename << ".RecoverPreparedThenAborted: " << flush; |
| tpct.testRecoverPreparedThenAborted(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueCommit) |
| { |
| cout << test_filename << ".MultiQueueCommit: " << flush; |
| tpct.testMultiQueueCommit(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueAbort) |
| { |
| cout << test_filename << ".MultiQueueAbort: " << flush; |
| tpct.testMultiQueueAbort(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover) |
| { |
| cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush; |
| tpct.testMultiQueueNoQueueCommitRecover(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover) |
| { |
| cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush; |
| tpct.testMultiQueueNoQueueAbortRecover(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover) |
| { |
| cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush; |
| tpct.testMultiQueueSomeQueueCommitRecover(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover) |
| { |
| cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush; |
| tpct.testMultiQueueSomeQueueAbortRecover(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover) |
| { |
| cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush; |
| tpct.testMultiQueueAllQueueCommitRecover(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover) |
| { |
| cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush; |
| tpct.testMultiQueueAllQueueAbortRecover(); |
| cout << "ok" << endl; |
| } |
| |
| QPID_AUTO_TEST_SUITE_END() |