blob: 2d3f6f922c037019bde7cb3da160e7be0cfdc1e7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "unit_test.h"
#include "qpid/legacystore/MessageStoreImpl.h"
#include <iostream>
#include "MessageUtils.h"
#include "qpid/legacystore/StoreException.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
using namespace mrg::msgstore;
using namespace qpid;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace std;
namespace {
qpid::broker::Broker::Options opts;
qpid::broker::Broker br(opts);
}
QPID_AUTO_TEST_SUITE(TransactionalTest)
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
opts.selectors.clear(); \
opts.selectors.push_back(level); \
qpid::log::Logger::instance().configure(opts);
const string test_filename("TransactionalTest");
const char* tdp = getenv("TMP_DATA_DIR");
const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
class TestTxnCtxt : public TxnCtxt
{
public:
TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
void setCompleteFailure(const unsigned num_queues_rem) {
// Remove queue members from back of impactedQueues until queues_rem reamin.
// to end to simulate multi-queue txn complete failure.
while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
}
void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
};
// Test store which has special begin() which returns a TestTPCTxnCtxt, and a method to check for
// remaining open transactions.
// begin(), commit(), and abort() all hide functions in MessageStoreImpl. To avoid the compiler
// warnings/errors these are renamed with a 'TMS' prefix.
class TestMessageStore: public MessageStoreImpl
{
public:
TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {}
std::auto_ptr<qpid::broker::TransactionContext> TMSbegin() {
checkInit();
// pass sequence number for c/a
return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
}
void TMScommit(TransactionContext& ctxt, const bool complete_prepared_list) {
checkInit();
TxnCtxt* txn(check(&ctxt));
if (!txn->isTPC()) {
localPrepare(dynamic_cast<TxnCtxt*>(txn));
if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
}
completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
void TMSabort(TransactionContext& ctxt, const bool complete_prepared_list)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
if (!txn->isTPC()) {
localPrepare(dynamic_cast<TxnCtxt*>(txn));
if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
}
completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
};
// === Helper fns ===
const string nameA("queueA");
const string nameB("queueB");
//const Uuid messageId(true);
std::auto_ptr<MessageStoreImpl> store;
std::auto_ptr<QueueRegistry> queues;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
template <class T>
void setup()
{
store = std::auto_ptr<T>(new T(&br));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
queueA->create();
queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
queueB->create();
}
template <class T>
void restart()
{
queueA.reset();
queueB.reset();
queues.reset();
store.reset();
store = std::auto_ptr<T>(new T(&br));
store->init(test_dir, 4, 1);
queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
LinkRegistry links;
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, br.getProtocolRegistry());
store->recover(recovery);
queueA = queues->find(nameA);
queueB = queues->find(nameB);
}
Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
{
return MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
}
void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL(size, queue->getMessageCount());
if (size > 0) {
Message msg = MessageUtils::get(*queue);
BOOST_REQUIRE(msg);
BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
}
}
void swap(bool commit)
{
setup<MessageStoreImpl>();
//create message and enqueue it onto first queue:
Message msgA = createMessage("Message", "exchange", "routing_key");
queueA->deliver(msgA);
QueueCursor cursorB;
Message msgB = MessageUtils::get(*queueA, &cursorB);
BOOST_REQUIRE(msgB);
//move the message from one queue to the other as a transaction
std::auto_ptr<TransactionContext> txn = store->begin();
TxBuffer tx;
queueB->deliver(msgB, &tx);//note: need to enqueue it first to avoid message being deleted
queueA->dequeue(txn.get(), cursorB);
tx.prepare(txn.get());
if (commit) {
store->commit(*txn);
} else {
store->abort(*txn);
}
restart<MessageStoreImpl>();
// Check outcome
BOOST_REQUIRE(queueA);
BOOST_REQUIRE(queueB);
Queue::shared_ptr x;//the queue from which the message was swapped
Queue::shared_ptr y;//the queue on which the message is expected to be
if (commit) {
x = queueA;
y = queueB;
} else {
x = queueB;
y = queueA;
}
checkMsg(x, 0);
checkMsg(y, 1, "Message");
checkMsg(y, 0);
}
void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
{
setup<TestMessageStore>();
TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
std::auto_ptr<TransactionContext> txn(tmsp->TMSbegin());
TxBuffer tx;
//create two messages and enqueue them onto both queues:
Message msgA = createMessage("MessageA", "exchange", "routing_key");
queueA->deliver(msgA, &tx);
queueB->deliver(msgA, &tx);
Message msgB = createMessage("MessageB", "exchange", "routing_key");
queueA->deliver(msgB, &tx);
queueB->deliver(msgB, &tx);
tx.prepare(txn.get());
static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
if (commit)
tmsp->TMScommit(*txn, complete_prepared_list);
else
tmsp->TMSabort(*txn, complete_prepared_list);
restart<TestMessageStore>();
// Check outcome
if (commit)
{
checkMsg(queueA, 2, "MessageA");
checkMsg(queueB, 2, "MessageA");
checkMsg(queueA, 1, "MessageB");
checkMsg(queueB, 1, "MessageB");
}
checkMsg(queueA, 0);
checkMsg(queueB, 0);
}
// === Test suite ===
QPID_AUTO_TEST_CASE(Commit)
{
SET_LOG_LEVEL("error+"); // This only needs to be set once.
cout << test_filename << ".Commit: " << flush;
swap(true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(Abort)
{
cout << test_filename << ".Abort: " << flush;
swap(false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueCommit)
{
cout << test_filename << ".MultiQueueCommit: " << flush;
testMultiQueueTxn(2, true, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAbort)
{
cout << test_filename << ".MultiQueueAbort: " << flush;
testMultiQueueTxn(2, true, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
{
cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
testMultiQueueTxn(0, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
{
cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
testMultiQueueTxn(0, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
{
cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
testMultiQueueTxn(1, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
{
cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
testMultiQueueTxn(1, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
{
cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
testMultiQueueTxn(2, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
{
cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
testMultiQueueTxn(2, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(LockedRecordTest)
{
cout << test_filename << ".LockedRecordTest: " << flush;
setup<MessageStoreImpl>();
queueA->deliver(createMessage("Message", "exchange", "routingKey"));
std::auto_ptr<TransactionContext> txn = store->begin();
QueueCursor cursor;
Message msg = MessageUtils::get(*queueA, &cursor);
queueA->dequeue(txn.get(), cursor);
try {
store->dequeue(0, msg.getPersistentContext(), *queueA);
BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
}
catch (const mrg::msgstore::StoreException& e) {
if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
BOOST_ERROR("Unexpected StoreException: " << e.what());
}
catch (const std::exception& e) {
BOOST_ERROR("Unexpected exception: " << e.what());
}
store->commit(*txn);
checkMsg(queueA, 0);
cout << "ok" << endl;
}
QPID_AUTO_TEST_SUITE_END()