blob: ee9d37e76dc0c56d57c55eedcee4b7167e4d512a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "MessageUtils.h"
#include "unit_test.h"
#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Timer.h"
#include <iostream>
#include <vector>
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
using namespace std;
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
namespace qpid {
namespace tests {
class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
QueueCursor lastCursor;
Message lastMessage;
bool received;
TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER, ""), received(false) {};
virtual bool deliver(const QueueCursor& cursor, const Message& message){
lastCursor = cursor;
lastMessage = message;
received = true;
return true;
};
void notify() {}
void cancel() {}
void acknowledged(const DeliveryRecord&) {}
OwnershipToken* getSession() { return 0; }
};
class FailOnDeliver : public Deliverable
{
Message msg;
public:
FailOnDeliver() : msg(MessageUtils::createMessage()) {}
void deliverTo(const boost::shared_ptr<Queue>& queue)
{
throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
Message& getMessage() { return msg; }
};
QPID_AUTO_TEST_SUITE(QueueTestSuite)
QPID_AUTO_TEST_CASE(testBound){
//test the recording of bindings, and use of those to allow a queue to be unbound
string key("my-key");
FieldTable args;
Queue::shared_ptr queue(new Queue("my-queue"));
ExchangeRegistry exchanges;
//establish bindings from exchange->queue and notify the queue as it is bound:
Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
exchange1->bind(queue, key, &args);
queue->bound(exchange1->getName(), key, args);
Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first;
exchange2->bind(queue, key, &args);
queue->bound(exchange2->getName(), key, args);
Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first;
exchange3->bind(queue, key, &args);
queue->bound(exchange3->getName(), key, args);
//delete one of the exchanges:
exchanges.destroy(exchange2->getName());
exchange2.reset();
//unbind the queue from all exchanges it knows it has been bound to:
queue->unbind(exchanges);
//ensure the remaining exchanges don't still have the queue bound to them:
FailOnDeliver deliverable;
exchange1->route(deliverable);
exchange3->route(deliverable);
}
QPID_AUTO_TEST_CASE(testLVQ){
QueueSettings settings;
string key="key";
settings.lvqKey = key;
QueueFactory factory;
Queue::shared_ptr q(factory.create("my-queue", settings));
const char* values[] = { "a", "b", "c", "a"};
for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
qpid::types::Variant::Map properties;
properties[key] = values[i];
q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
TestConsumer::shared_ptr c(new TestConsumer("test", true));
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent());
const char* values2[] = { "a", "b", "c"};
for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) {
qpid::types::Variant::Map properties;
properties[key] = values[i];
q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5)));
}
BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent());
}
QPID_AUTO_TEST_CASE(testLVQEmptyKey){
QueueSettings settings;
string key="key";
settings.lvqKey = key;
QueueFactory factory;
Queue::shared_ptr q(factory.create("my-queue", settings));
qpid::types::Variant::Map properties;
properties["key"] = "a";
q->deliver(MessageUtils::createMessage(properties, "one"));
properties.clear();
q->deliver(MessageUtils::createMessage(properties, "two"));
BOOST_CHECK_EQUAL(q->getMessageCount(), 2u);
}
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
queue.deliver(m);
}
}
QPID_AUTO_TEST_CASE(testPurgeExpired) {
Queue queue("my-queue");
addMessagesToQueue(10, queue);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
::usleep(300*1000);
queue.purgeExpired(0);
BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
}
QPID_AUTO_TEST_CASE(testQueueCleaner) {
boost::shared_ptr<Poller> poller(new Poller);
Thread runner(poller.get());
Timer timer;
QueueRegistry queues;
Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
QueueCleaner cleaner(queues, poller, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
poller->shutdown();
runner.join();
}
namespace {
int getIntProperty(const Message& message, const std::string& key)
{
qpid::types::Variant v = message.getProperty(key);
int i(0);
if (!v.isVoid()) i = v;
return i;
}
// helper for group tests
void verifyAcquire( Queue::shared_ptr queue,
TestConsumer::shared_ptr c,
std::deque<QueueCursor>& results,
const std::string& expectedGroup,
const int expectedId )
{
bool success = queue->dispatch(c);
BOOST_CHECK(success);
if (success) {
results.push_back(c->lastCursor);
std::string group = c->lastMessage.getPropertyAsString("GROUP-ID");
int id = getIntProperty(c->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( group, expectedGroup );
BOOST_CHECK_EQUAL( id, expectedId );
}
}
Message createGroupMessage(int id, const std::string& group)
{
qpid::types::Variant::Map properties;
properties["GROUP-ID"] = group;
properties["MY-ID"] = id;
return MessageUtils::createMessage(properties);
}
}
QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
//
// Verify that consumers of grouped messages own the groups once a message is acquired,
// and release the groups once all acquired messages have been dequeued or requeued
//
QueueSettings settings;
settings.shareGroups = 1;
settings.groupKey = "GROUP-ID";
QueueFactory factory;
Queue::shared_ptr queue(factory.create("my_queue", settings));
std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
std::string("b"), std::string("b"), std::string("b"),
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
queue->deliver(createGroupMessage(i, groups[i]));
}
// Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
// Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
TestConsumer::shared_ptr c1(new TestConsumer("C1"));
TestConsumer::shared_ptr c2(new TestConsumer("C2"));
queue->consume(c1);
queue->consume(c2);
std::deque<QueueCursor> dequeMeC1;
std::deque<QueueCursor> dequeMeC2;
verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
// now let c1 complete the 'a-0' message - this should free the 'a' group
queue->dequeue( 0, dequeMeC1.front() );
dequeMeC1.pop_front();
// Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
// Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
// now c2 should pick up the next 'a-1', since it is oldest free
verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
// Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
// Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
// c1 should only be able to snarf up the first "c" message now...
verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
// Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
// Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
// hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
queue->dequeue( 0, dequeMeC2.front() );
dequeMeC2.pop_front();
// Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
// Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
// b group is free, c is owned by c1 - c1's next get should grab 'b-4'
verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
// Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
// Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
// c2 can now only grab a-2, and that's all
verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
// now C2 can't get any more, since C1 owns "b" and "c" group...
bool gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
// hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
queue->dequeue( 0, dequeMeC1.front() );
dequeMeC1.pop_front();
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
// Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
// c2 can now grab c-7
verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
// Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
// what happens if C-2 "requeues" a-1 and a-2?
queue->release( dequeMeC2.front() );
dequeMeC2.pop_front();
queue->release( dequeMeC2.front() );
dequeMeC2.pop_front(); // now just has c-7 acquired
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
// Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
// now c1 will grab a-1 and a-2...
verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
// Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
// c2 can now acquire c-8 only
verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
// and c1 can get b-5
verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
// should be no more acquire-able for anyone now:
gotOne = queue->dispatch(c1);
BOOST_CHECK( !gotOne );
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
// release all of C1's acquired messages, then cancel C1
while (!dequeMeC1.empty()) {
queue->release(dequeMeC1.front());
dequeMeC1.pop_front();
}
queue->cancel(c1);
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
// Owners= ---, ---, ---, ---, ^C2, ^C2
// b-4, a-1, a-2, b-5 all should be available, right?
verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
while (!dequeMeC2.empty()) {
queue->dequeue(0, dequeMeC2.front());
dequeMeC2.pop_front();
}
// Queue = a-2, b-4, b-5
// Owners= ---, ---, ---
TestConsumer::shared_ptr c3(new TestConsumer("C3"));
queue->consume(c3);
std::deque<QueueCursor> dequeMeC3;
verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
// Queue = a-2, b-4, b-5
// Owners= ^C3, ^C2, ^C2
gotOne = queue->dispatch(c3);
BOOST_CHECK( !gotOne );
verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
while (!dequeMeC2.empty()) {
queue->dequeue(0, dequeMeC2.front());
dequeMeC2.pop_front();
}
// Queue = a-2,
// Owners= ^C3,
queue->deliver(createGroupMessage(9, "a"));
// Queue = a-2, a-9
// Owners= ^C3, ^C3
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
queue->deliver(createGroupMessage(10, "b"));
// Queue = a-2, a-9, b-10
// Owners= ^C3, ^C3, ----
verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
gotOne = queue->dispatch(c3);
BOOST_CHECK( !gotOne );
queue->cancel(c2);
queue->cancel(c3);
}
QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
//
// Verify that the same default group name is automatically applied to messages that
// do not specify a group name.
//
QueueSettings settings;
settings.shareGroups = 1;
settings.groupKey = "GROUP-ID";
QueueFactory factory;
Queue::shared_ptr queue(factory.create("my_queue", settings));
for (int i = 0; i < 3; ++i) {
qpid::types::Variant::Map properties;
// no "GROUP-ID" header
properties["MY-ID"] = i;
queue->deliver(MessageUtils::createMessage(properties));
}
// Queue = 0, 1, 2
BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
TestConsumer::shared_ptr c1(new TestConsumer("C1"));
TestConsumer::shared_ptr c2(new TestConsumer("C2"));
queue->consume(c1);
queue->consume(c2);
std::deque<QueueCursor> dequeMeC1;
std::deque<QueueCursor> dequeMeC2;
queue->dispatch(c1); // c1 now owns default group (acquired 0)
dequeMeC1.push_back(c1->lastCursor);
int id = getIntProperty(c1->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( id, 0 );
bool gotOne = queue->dispatch(c2); // c2 should get nothing
BOOST_CHECK( !gotOne );
queue->dispatch(c1); // c1 now acquires 1
dequeMeC1.push_back(c1->lastCursor);
id = getIntProperty(c1->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( id, 1 );
gotOne = queue->dispatch(c2); // c2 should still get nothing
BOOST_CHECK( !gotOne );
while (!dequeMeC1.empty()) {
queue->dequeue(0, dequeMeC1.front());
dequeMeC1.pop_front();
}
// now default group should be available...
queue->dispatch(c2); // c2 now owns default group (acquired 2)
id = c2->lastMessage.getProperty("MY-ID");
BOOST_CHECK_EQUAL( id, 2 );
gotOne = queue->dispatch(c1); // c1 should get nothing
BOOST_CHECK( !gotOne );
queue->cancel(c1);
queue->cancel(c2);
}
QPID_AUTO_TEST_CASE(testSetPositionFifo) {
Queue::shared_ptr q(new Queue("my-queue", true));
BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
for (int i = 0; i < 10; ++i)
q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1)));
// Verify the front of the queue
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1
BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
// Verify the back of the queue
BOOST_CHECK_EQUAL(10u, q->getPosition());
BOOST_CHECK_EQUAL(10u, q->getMessageCount());
// Using setPosition to introduce a gap in sequence numbers.
q->setPosition(15);
BOOST_CHECK_EQUAL(10u, q->getMessageCount());
BOOST_CHECK_EQUAL(15u, q->getPosition());
q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16"));
q->seek(*c, Queue::MessagePredicate(), 9);
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("10", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("16", c->lastMessage.getContent());
// Using setPosition to trunkcate the queue
q->setPosition(5);
BOOST_CHECK_EQUAL(5u, q->getMessageCount());
q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a"));
c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
q->seek(*c, Queue::MessagePredicate(), 4);
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("5", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent());
BOOST_CHECK(!q->dispatch(c)); // No more messages.
}
QPID_AUTO_TEST_CASE(testSetPositionLvq) {
QueueSettings settings;
string key="key";
settings.lvqKey = key;
QueueFactory factory;
Queue::shared_ptr q(factory.create("my-queue", settings));
const char* values[] = { "a", "b", "c", "a", "b", "c" };
for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
qpid::types::Variant::Map properties;
properties[key] = values[i];
q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
BOOST_CHECK_EQUAL(3u, q->getMessageCount());
// Verify the front of the queue
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1
BOOST_CHECK_EQUAL("4", c->lastMessage.getContent());
// Verify the back of the queue
BOOST_CHECK_EQUAL(6u, q->getPosition());
q->setPosition(5);
c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
q->seek(*c, Queue::MessagePredicate(), 4);
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1
BOOST_CHECK(!q->dispatch(c));
}
QPID_AUTO_TEST_CASE(testSetPositionPriority) {
QueueSettings settings;
settings.priorities = 10;
QueueFactory factory;
Queue::shared_ptr q(factory.create("my-queue", settings));
const int priorities[] = { 1, 2, 3, 2, 1, 3 };
for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
qpid::types::Variant::Map properties;
properties["priority"] = priorities[i];
q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
// Truncation removes messages in fifo order, not priority order.
q->setPosition(3);
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
BOOST_CHECK(!q->dispatch(c));
qpid::types::Variant::Map properties;
properties["priority"] = 4;
q->deliver(MessageUtils::createMessage(properties, "4a"));
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
// But consumers see priority order
c.reset(new TestConsumer("test", true));
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("3", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("2", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
}
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests