blob: 4125c516989ba424773c8f1ca420b4bd16d9df50 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "unit_test.h"
#include "test_tools.h"
#include "BrokerFixture.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/ListContent.h"
#include "qpid/messaging/ListView.h"
#include "qpid/messaging/MapContent.h"
#include "qpid/messaging/MapView.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Session.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Time.h"
#include <boost/assign.hpp>
#include <boost/format.hpp>
#include <string>
#include <vector>
namespace qpid {
namespace tests {
QPID_AUTO_TEST_SUITE(MessagingSessionTests)
using namespace qpid::messaging;
using namespace qpid;
using qpid::broker::Broker;
using qpid::framing::Uuid;
struct BrokerAdmin
{
qpid::client::Connection connection;
qpid::client::Session session;
BrokerAdmin(uint16_t port)
{
connection.open("localhost", port);
session = connection.newSession();
}
void createQueue(const std::string& name)
{
session.queueDeclare(qpid::client::arg::queue=name);
}
void deleteQueue(const std::string& name)
{
session.queueDelete(qpid::client::arg::queue=name);
}
void createExchange(const std::string& name, const std::string& type)
{
session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type);
}
void deleteExchange(const std::string& name)
{
session.exchangeDelete(qpid::client::arg::exchange=name);
}
bool checkQueueExists(const std::string& name)
{
return session.queueQuery(name).getQueue() == name;
}
bool checkExchangeExists(const std::string& name, std::string& type)
{
qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
type = result.getType();
return !result.getNotFound();
}
~BrokerAdmin()
{
session.close();
connection.close();
}
};
struct MessagingFixture : public BrokerFixture
{
Connection connection;
Session session;
BrokerAdmin admin;
MessagingFixture(Broker::Options opts = Broker::Options()) :
BrokerFixture(opts),
connection(Connection::open((boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(Broker::TCP_TRANSPORT))).str())),
session(connection.newSession()),
admin(broker->getPort(Broker::TCP_TRANSPORT)) {}
void ping(const qpid::messaging::Address& address)
{
Receiver r = session.createReceiver(address);
Sender s = session.createSender(address);
Message out(Uuid(true).str());
s.send(out);
Message in;
BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
r.cancel();
s.cancel();
}
~MessagingFixture()
{
session.close();
connection.close();
}
};
struct QueueFixture : MessagingFixture
{
std::string queue;
QueueFixture(const std::string& name = "test-queue") : queue(name)
{
admin.createQueue(queue);
}
~QueueFixture()
{
admin.deleteQueue(queue);
}
};
struct TopicFixture : MessagingFixture
{
std::string topic;
TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name)
{
admin.createExchange(topic, type);
}
~TopicFixture()
{
admin.deleteExchange(topic);
}
};
struct MultiQueueFixture : MessagingFixture
{
typedef std::vector<std::string>::const_iterator const_iterator;
std::vector<std::string> queues;
MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names)
{
for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
admin.createQueue(*i);
}
}
~MultiQueueFixture()
{
for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
admin.deleteQueue(*i);
}
}
};
std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
{
std::vector<std::string> data;
Message message;
for (int i = 0; i < count && receiver.fetch(message, timeout); i++) {
data.push_back(message.getContent());
}
return data;
}
void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message")
{
for (uint i = start; i < start + count; ++i) {
sender.send(Message((boost::format("%1%_%2%") % base % i).str()));
}
}
void receive(Receiver& receiver, uint count = 1, uint start = 1,
const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
{
for (uint i = start; i < start + count; ++i) {
BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
}
}
QPID_AUTO_TEST_CASE(testSimpleSendReceive)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
Message out("test-message");
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
fix.session.acknowledge();
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
}
QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
Message out("test-message");
for (uint i = 0; i < 10; ++i) {
out.getHeaders()["a"] = i;
sender.send(out);
}
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in;
for (uint i = 0; i < 10; ++i) {
BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
fix.session.acknowledge();
}
}
QPID_AUTO_TEST_CASE(testSenderError)
{
MessagingFixture fix;
ScopedSuppressLogging sl;
BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
fix.session = fix.connection.newSession();
BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver}"),
qpid::messaging::InvalidAddress);
}
QPID_AUTO_TEST_CASE(testReceiverError)
{
MessagingFixture fix;
ScopedSuppressLogging sl;
BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
fix.session = fix.connection.newSession();
BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender}"),
qpid::messaging::InvalidAddress);
}
QPID_AUTO_TEST_CASE(testSimpleTopic)
{
TopicFixture fix;
Sender sender = fix.session.createSender(fix.topic);
Message msg("one");
sender.send(msg);
Receiver sub1 = fix.session.createReceiver(fix.topic);
sub1.setCapacity(10u);
msg.setContent("two");
sender.send(msg);
Receiver sub2 = fix.session.createReceiver(fix.topic);
sub2.setCapacity(10u);
msg.setContent("three");
sender.send(msg);
Receiver sub3 = fix.session.createReceiver(fix.topic);
sub3.setCapacity(10u);
msg.setContent("four");
sender.send(msg);
BOOST_CHECK_EQUAL(fetch(sub2, 2), boost::assign::list_of<std::string>("three")("four"));
sub2.cancel();
msg.setContent("five");
sender.send(msg);
BOOST_CHECK_EQUAL(fetch(sub1, 4), boost::assign::list_of<std::string>("two")("three")("four")("five"));
BOOST_CHECK_EQUAL(fetch(sub3, 2), boost::assign::list_of<std::string>("four")("five"));
Message in;
BOOST_CHECK(!sub2.fetch(in, 0));//TODO: or should this raise an error?
//TODO: check pending messages...
}
QPID_AUTO_TEST_CASE(testNextReceiver)
{
MultiQueueFixture fix;
for (uint i = 0; i < fix.queues.size(); i++) {
Receiver r = fix.session.createReceiver(fix.queues[i]);
r.setCapacity(10u);
}
for (uint i = 0; i < fix.queues.size(); i++) {
Sender s = fix.session.createSender(fix.queues[i]);
Message msg((boost::format("Message_%1%") % (i+1)).str());
s.send(msg);
}
for (uint i = 0; i < fix.queues.size(); i++) {
Message msg;
BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
}
}
QPID_AUTO_TEST_CASE(testMapMessage)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
Message out;
MapContent content(out);
content["abc"] = "def";
content["pi"] = 3.14f;
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
MapView view(in);
BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
fix.session.acknowledge();
}
QPID_AUTO_TEST_CASE(testListMessage)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
Message out;
ListContent content(out);
content.push_back(Variant("abc"));
content.push_back(Variant(1234));
content.push_back(Variant("def"));
content.push_back(Variant(56.789));
content.encode();
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
ListView view(in);
BOOST_CHECK_EQUAL(view.size(), content.size());
BOOST_CHECK_EQUAL(view.front().asString(), "abc");
BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789);
ListView::const_iterator i = view.begin();
BOOST_CHECK(i != view.end());
BOOST_CHECK_EQUAL(i->asString(), "abc");
BOOST_CHECK(++i != view.end());
BOOST_CHECK_EQUAL(i->asInt64(), 1234);
BOOST_CHECK(++i != view.end());
BOOST_CHECK_EQUAL(i->asString(), "def");
BOOST_CHECK(++i != view.end());
BOOST_CHECK_EQUAL(i->asDouble(), 56.789);
BOOST_CHECK(++i == view.end());
fix.session.acknowledge();
}
QPID_AUTO_TEST_CASE(testReject)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
Message m1("reject-me");
sender.send(m1);
Message m2("accept-me");
sender.send(m2);
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
BOOST_CHECK_EQUAL(in.getContent(), m1.getContent());
fix.session.reject(in);
in = receiver.fetch(5 * qpid::sys::TIME_SEC);
BOOST_CHECK_EQUAL(in.getContent(), m2.getContent());
fix.session.acknowledge();
}
QPID_AUTO_TEST_CASE(testAvailable)
{
MultiQueueFixture fix;
Receiver r1 = fix.session.createReceiver(fix.queues[0]);
r1.setCapacity(100);
Receiver r2 = fix.session.createReceiver(fix.queues[1]);
r2.setCapacity(100);
Sender s1 = fix.session.createSender(fix.queues[0]);
Sender s2 = fix.session.createSender(fix.queues[1]);
for (uint i = 0; i < 10; ++i) {
s1.send(Message((boost::format("A_%1%") % (i+1)).str()));
}
for (uint i = 0; i < 5; ++i) {
s2.send(Message((boost::format("B_%1%") % (i+1)).str()));
}
qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched?
for (uint i = 0; i < 5; ++i) {
BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i);
BOOST_CHECK_EQUAL(r1.available(), 10u - i);
BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str());
BOOST_CHECK_EQUAL(r2.available(), 5u - i);
BOOST_CHECK_EQUAL(r2.fetch().getContent(), (boost::format("B_%1%") % (i+1)).str());
fix.session.acknowledge();
}
for (uint i = 5; i < 10; ++i) {
BOOST_CHECK_EQUAL(fix.session.available(), 10u - i);
BOOST_CHECK_EQUAL(r1.available(), 10u - i);
BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str());
}
}
QPID_AUTO_TEST_CASE(testPendingAck)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
for (uint i = 0; i < 10; ++i) {
sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
}
Receiver receiver = fix.session.createReceiver(fix.queue);
for (uint i = 0; i < 10; ++i) {
BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str());
}
BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
fix.session.acknowledge();
BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u);
fix.session.sync();
BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
}
QPID_AUTO_TEST_CASE(testPendingSend)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
send(sender, 10);
//Note: this test relies on 'inside knowledge' of the sender
//implementation and the fact that the simple test case makes it
//possible to predict when completion information will be sent to
//the client. TODO: is there a better way of testing this?
BOOST_CHECK_EQUAL(sender.pending(), 10u);
fix.session.sync();
BOOST_CHECK_EQUAL(sender.pending(), 0u);
Receiver receiver = fix.session.createReceiver(fix.queue);
receive(receiver, 10);
fix.session.acknowledge();
}
QPID_AUTO_TEST_CASE(testBrowse)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
send(sender, 10);
Receiver browser1 = fix.session.createReceiver(fix.queue + "; {browse:true}");
receive(browser1, 10);
Receiver browser2 = fix.session.createReceiver(fix.queue + "; {browse:true}");
receive(browser2, 10);
Receiver consumer = fix.session.createReceiver(fix.queue);
receive(consumer, 10);
fix.session.acknowledge();
}
struct QueueCreatePolicyFixture : public MessagingFixture
{
qpid::messaging::Address address;
QueueCreatePolicyFixture(const std::string& a) : address(a) {}
void test()
{
ping(address);
BOOST_CHECK(admin.checkQueueExists(address.getName()));
}
~QueueCreatePolicyFixture()
{
admin.deleteQueue(address.getName());
}
};
QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
{
QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
{
QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.cancel();
}
QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
{
QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.cancel();
}
struct ExchangeCreatePolicyFixture : public MessagingFixture
{
qpid::messaging::Address address;
const std::string exchangeType;
ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) :
address(a), exchangeType(t) {}
void test()
{
ping(address);
std::string actualType;
BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType));
BOOST_CHECK_EQUAL(exchangeType, actualType);
}
~ExchangeCreatePolicyFixture()
{
admin.deleteExchange(address.getName());
}
};
QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
{
ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}",
"topic");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
{
ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.cancel();
}
QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
{
ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.cancel();
}
struct DeletePolicyFixture : public MessagingFixture
{
enum Mode {RECEIVER, SENDER, ALWAYS, NEVER};
std::string getPolicy(Mode mode)
{
switch (mode) {
case SENDER:
return "{delete:sender}";
case RECEIVER:
return "{delete:receiver}";
case ALWAYS:
return "{delete:always}";
case NEVER:
return "{delete:never}";
}
return "";
}
void testAll()
{
test(RECEIVER);
test(SENDER);
test(ALWAYS);
test(NEVER);
}
virtual ~DeletePolicyFixture() {}
virtual void create(const qpid::messaging::Address&) = 0;
virtual void destroy(const qpid::messaging::Address&) = 0;
virtual bool exists(const qpid::messaging::Address&) = 0;
void test(Mode mode)
{
qpid::messaging::Address address("#; " + getPolicy(mode));
create(address);
Sender s = session.createSender(address);
Receiver r = session.createReceiver(address);
switch (mode) {
case RECEIVER:
s.cancel();
BOOST_CHECK(exists(address));
r.cancel();
BOOST_CHECK(!exists(address));
break;
case SENDER:
r.cancel();
BOOST_CHECK(exists(address));
s.cancel();
BOOST_CHECK(!exists(address));
break;
case ALWAYS:
s.cancel();
BOOST_CHECK(!exists(address));
break;
case NEVER:
r.cancel();
BOOST_CHECK(exists(address));
s.cancel();
BOOST_CHECK(exists(address));
destroy(address);
}
}
};
struct QueueDeletePolicyFixture : DeletePolicyFixture
{
void create(const qpid::messaging::Address& address)
{
admin.createQueue(address.getName());
}
void destroy(const qpid::messaging::Address& address)
{
admin.deleteQueue(address.getName());
}
bool exists(const qpid::messaging::Address& address)
{
return admin.checkQueueExists(address.getName());
}
};
struct ExchangeDeletePolicyFixture : DeletePolicyFixture
{
const std::string exchangeType;
ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {}
void create(const qpid::messaging::Address& address)
{
admin.createExchange(address.getName(), exchangeType);
}
void destroy(const qpid::messaging::Address& address)
{
admin.deleteExchange(address.getName());
}
bool exists(const qpid::messaging::Address& address)
{
std::string actualType;
return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType;
}
};
QPID_AUTO_TEST_CASE(testDeletePolicyQueue)
{
QueueDeletePolicyFixture fix;
fix.testAll();
}
QPID_AUTO_TEST_CASE(testDeletePolicyExchange)
{
ExchangeDeletePolicyFixture fix;
fix.testAll();
}
QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
{
MessagingFixture fix;
std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}";
Sender s1 = fix.session.createSender(a1);
s1.cancel();
Receiver r1 = fix.session.createReceiver(a1);
r1.cancel();
std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}";
Sender s2 = fix.session.createSender(a2);
s2.cancel();
BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}";
BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
Receiver r3 = fix.session.createReceiver(a3);
r3.cancel();
fix.admin.deleteQueue("q");
}
QPID_AUTO_TEST_CASE(testGetSender)
{
QueueFixture fix;
std::string name = fix.session.createSender(fix.queue).getName();
Sender sender = fix.session.getSender(name);
BOOST_CHECK_EQUAL(name, sender.getName());
Message out(Uuid(true).str());
sender.send(out);
Message in;
BOOST_CHECK(fix.session.createReceiver(fix.queue).fetch(in));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
BOOST_CHECK_THROW(fix.session.getSender("UnknownSender"), qpid::messaging::KeyError);
}
QPID_AUTO_TEST_CASE(testGetReceiver)
{
QueueFixture fix;
std::string name = fix.session.createReceiver(fix.queue).getName();
Receiver receiver = fix.session.getReceiver(name);
BOOST_CHECK_EQUAL(name, receiver.getName());
Message out(Uuid(true).str());
fix.session.createSender(fix.queue).send(out);
Message in;
BOOST_CHECK(receiver.fetch(in));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
BOOST_CHECK_THROW(fix.session.getReceiver("UnknownReceiver"), qpid::messaging::KeyError);
}
QPID_AUTO_TEST_CASE(testGetSessionFromConnection)
{
QueueFixture fix;
fix.connection.newSession("my-session");
Session session = fix.connection.getSession("my-session");
Message out(Uuid(true).str());
session.createSender(fix.queue).send(out);
Message in;
BOOST_CHECK(session.createReceiver(fix.queue).fetch(in));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
BOOST_CHECK_THROW(fix.connection.getSession("UnknownSession"), qpid::messaging::KeyError);
}
QPID_AUTO_TEST_CASE(testGetConnectionFromSession)
{
QueueFixture fix;
Message out(Uuid(true).str());
Sender sender = fix.session.createSender(fix.queue);
sender.send(out);
Message in;
sender.getSession().getConnection().newSession("incoming");
BOOST_CHECK(fix.connection.getSession("incoming").createReceiver(fix.queue).fetch(in));
BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
}
QPID_AUTO_TEST_CASE(testTx)
{
QueueFixture fix;
Session ssn1 = fix.connection.newSession(true);
Session ssn2 = fix.connection.newSession(true);
Sender sender1 = ssn1.createSender(fix.queue);
Sender sender2 = ssn2.createSender(fix.queue);
Receiver receiver1 = ssn1.createReceiver(fix.queue);
Receiver receiver2 = ssn2.createReceiver(fix.queue);
Message in;
send(sender1, 5, 1, "A");
send(sender2, 5, 1, "B");
ssn2.commit();
receive(receiver1, 5, 1, "B");//(only those from sender2 should be received)
BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages
ssn1.rollback();
receive(receiver2, 5, 1, "B");
BOOST_CHECK(!receiver2.fetch(in, 0));//check there are no more messages
ssn2.rollback();
receive(receiver1, 5, 1, "B");
BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages
ssn1.commit();
//check neither receiver gets any more messages:
BOOST_CHECK(!receiver1.fetch(in, 0));
BOOST_CHECK(!receiver2.fetch(in, 0));
}
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests