blob: a49333d876a8b15dbf94797e1320232626c118b2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "unit_test.h"
#include "qpid/legacystore/MessageStoreImpl.h"
#include <iostream>
#include "tests/legacystore/MessageUtils.h"
#include "qpid/legacystore/StoreException.h"
#include "qpid/broker/DirectExchange.h"
#include <qpid/broker/Queue.h>
#include <qpid/broker/QueueSettings.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/FieldTable.h>
#include <qpid/framing/FieldValue.h>
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
qpid::broker::Broker::Options opts;
qpid::broker::Broker br(opts);
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
opts.selectors.clear(); \
opts.selectors.push_back(level); \
qpid::log::Logger::instance().configure(opts);
using boost::intrusive_ptr;
using boost::static_pointer_cast;
using namespace qpid;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace mrg::msgstore;
using namespace std;
QPID_AUTO_TEST_SUITE(SimpleTest)
const string test_filename("SimpleTest");
const char* tdp = getenv("TMP_DATA_DIR");
const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest");
// === Helper fns ===
struct DummyHandler : OutputHandler
{
std::vector<AMQFrame> frames;
virtual void send(AMQFrame& frame){
frames.push_back(frame);
}
};
void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
{
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (&store);
RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry());
store.recover(recovery);
}
void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges)
{
QueueRegistry queues;
LinkRegistry links;
recover(store, queues, exchanges, links);
}
void recover(MessageStoreImpl& store, QueueRegistry& queues)
{
ExchangeRegistry exchanges;
LinkRegistry links;
recover(store, queues, exchanges, links);
}
void bindAndUnbind(const string& exchangeName, const string& queueName,
const string& key, const FieldTable& args)
{
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
store.create(*exchange, qpid::framing::FieldTable());
store.create(*queue, qpid::framing::FieldTable());
BOOST_REQUIRE(exchange->bind(queue, key, &args));
store.bind(*exchange, *queue, key, args);
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
recover(store, queues, exchanges, links);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
Queue::shared_ptr queue = queues.find(queueName);
// check exchange args are still set
for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
}
//check it is bound by unbinding
BOOST_REQUIRE(exchange->unbind(queue, key, &args));
store.unbind(*exchange, *queue, key, args);
}
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
recover(store, queues, exchanges, links);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
Queue::shared_ptr queue = queues.find(queueName);
// check exchange args are still set
for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
}
//make sure it is no longer bound
BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
}
}
// === Test suite ===
QPID_AUTO_TEST_CASE(CreateDelete)
{
SET_LOG_LEVEL("error+"); // This only needs to be set once.
cout << test_filename << ".CreateDelete: " << flush;
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
// TODO - check dir exists
BOOST_REQUIRE(queue.getPersistenceId());
store.destroy(queue);
// TODO - check dir is deleted
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(EmptyRecover)
{
cout << test_filename << ".EmptyRecover: " << flush;
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
//nothing to assert, just testing it doesn't blow up
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(QueueCreate)
{
cout << test_filename << ".QueueCreate: " << flush;
uint64_t id(0);
string name("MyDurableQueue");
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
BOOST_REQUIRE(queue.getPersistenceId());
id = queue.getPersistenceId();
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue.get());
BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
{
cout << test_filename << ".QueueCreateWithSettings: " << flush;
FieldTable arguments;
arguments.setInt("qpid.max_count", 202);
arguments.setInt("qpid.max_size", 1003);
QueueSettings settings;
settings.populate(arguments, settings.storeSettings);
string name("MyDurableQueue");
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, settings, &store, 0);
queue.create();
BOOST_REQUIRE(queue.getPersistenceId());
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount());
BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize());
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(QueueDestroy)
{
cout << test_filename << ".QueueDestroy: " << flush;
string name("MyDurableQueue");
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
store.destroy(queue);
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
BOOST_REQUIRE(!registry.find(name));
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(Enqueue)
{
cout << test_filename << ".Enqueue: " << flush;
//TODO: this is largely copy & paste'd from MessageTest in
//qpid tree. ideally need some helper routines for reducing
//this to a simpler less duplicated form
string name("MyDurableQueue");
string exchange("MyExchange");
string routingKey("MyRoutingKey");
Uuid messageId(true);
string data1("abcdefg");
string data2("hijklmn");
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
queue->create();
Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14);
MessageUtils::addContent(msg, data1);
MessageUtils::addContent(msg, data2);
msg.addAnnotation("abc", "xyz");
queue->deliver(msg);
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
Message msg = MessageUtils::get(*queue);
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc"));
BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
DummyHandler handler;
MessageUtils::deliver(msg, handler, 100);
BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size());
AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody()));
BOOST_REQUIRE(contentBody);
BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(Dequeue)
{
cout << test_filename << ".Dequeue: " << flush;
//TODO: reduce the duplication in these tests
string name("MyDurableQueue");
{
string exchange("MyExchange");
string routingKey("MyRoutingKey");
Uuid messageId(true);
string data("abcdefg");
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
queue->create();
Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7);
MessageUtils::addContent(msg, data);
queue->deliver(msg);
QueueCursor cursor;
MessageUtils::get(*queue, &cursor);
queue->dequeue(0, cursor);
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
{
cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
uint64_t id(0);
string name("MyDurableExchange");
string type("direct");
FieldTable args;
args.setString("a", "A");
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
store.create(*exchange, qpid::framing::FieldTable());
id = exchange->getPersistenceId();
BOOST_REQUIRE(id);
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
recover(store, registry);
Exchange::shared_ptr exchange = registry.get(name);
BOOST_CHECK_EQUAL(id, exchange->getPersistenceId());
BOOST_CHECK_EQUAL(type, exchange->getType());
BOOST_REQUIRE(exchange->isDurable());
BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a"));
store.destroy(*exchange);
}
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
recover(store, registry);
try {
Exchange::shared_ptr exchange = registry.get(name);
BOOST_FAIL("Expected exchange not to be found");
} catch (const SessionException& e) {
BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
}
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
{
cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
{
cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
FieldTable args;
args.setString("a", "A");
args.setString("b", "B");
bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
{
cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
string exchangeName("MyDurableExchange");
string queueName1("MyDurableQueue1");
string queueName2("MyDurableQueue2");
string key("my-routing-key");
FieldTable args;
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
store.create(*exchange, qpid::framing::FieldTable());
store.create(*queue1, qpid::framing::FieldTable());
store.create(*queue2, qpid::framing::FieldTable());
store.bind(*exchange, *queue1, key, args);
store.bind(*exchange, *queue2, key, args);
//delete queue1:
store.destroy(*queue1);
}//db will be closed
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
//ensure recovery works ok:
recover(store, queues, exchanges, links);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
BOOST_REQUIRE(!queues.find(queueName1).get());
BOOST_REQUIRE(queues.find(queueName2).get());
//delete exchange:
store.destroy(*exchange);
}
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
//ensure recovery works ok:
recover(store, queues, exchanges, links);
try {
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
BOOST_FAIL("Expected exchange not to be found");
} catch (const SessionException& e) {
BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
}
Queue::shared_ptr queue = queues.find(queueName2);
store.destroy(*queue);
}
cout << "ok" << endl;
}
QPID_AUTO_TEST_SUITE_END()