blob: f35524c0c04abbaef411542a33bb089deebd7105 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "unit_test.h"
#include "test_tools.h"
#include "BrokerFixture.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Time.h"
#include "qpid/client/Session.h"
#include "qpid/client/Message.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/optional.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/format.hpp>
#include <vector>
namespace qpid {
namespace tests {
QPID_AUTO_TEST_SUITE(ClientSessionTest)
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid;
using qpid::sys::Monitor;
using qpid::sys::Thread;
using qpid::sys::TIME_SEC;
using qpid::broker::BrokerOptions;
using std::string;
using std::cout;
using std::endl;
struct DummyListener : public sys::Runnable, public MessageListener {
std::vector<Message> messages;
string name;
uint expected;
SubscriptionManager submgr;
DummyListener(Session& session, const string& n, uint ex) :
name(n), expected(ex), submgr(session) {}
void run()
{
submgr.subscribe(*this, name);
submgr.run();
}
void received(Message& msg)
{
messages.push_back(msg);
if (--expected == 0) {
submgr.stop();
}
}
};
struct SimpleListener : public MessageListener
{
Monitor lock;
std::vector<Message> messages;
void received(Message& msg)
{
Monitor::ScopedLock l(lock);
messages.push_back(msg);
lock.notifyAll();
}
void waitFor(const uint n)
{
Monitor::ScopedLock l(lock);
while (messages.size() < n) {
lock.wait();
}
}
};
struct ClientSessionFixture : public SessionFixture
{
ClientSessionFixture(const BrokerOptions& opts = BrokerOptions()) : SessionFixture(opts) {
session.queueDeclare(arg::queue="my-queue");
}
};
QPID_AUTO_TEST_CASE(testQueueQuery) {
ClientSessionFixture fix;
fix.session = fix.connection.newSession();
fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout",
arg::exclusive=true, arg::autoDelete=true);
QueueQueryResult result = fix.session.queueQuery("q");
BOOST_CHECK_EQUAL(false, result.getDurable());
BOOST_CHECK_EQUAL(true, result.getExclusive());
BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange());
}
QPID_AUTO_TEST_CASE(testDispatcher)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
size_t count = 100;
for (size_t i = 0; i < count; ++i)
fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue"));
DummyListener listener(fix.session, "my-queue", count);
listener.run();
BOOST_CHECK_EQUAL(count, listener.messages.size());
for (size_t i = 0; i < count; ++i)
BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
}
QPID_AUTO_TEST_CASE(testDispatcherThread)
{
ClientSessionFixture fix;
fix.session =fix.connection.newSession();
size_t count = 10;
DummyListener listener(fix.session, "my-queue", count);
sys::Thread t(listener);
for (size_t i = 0; i < count; ++i) {
fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue"));
}
t.join();
BOOST_CHECK_EQUAL(count, listener.messages.size());
for (size_t i = 0; i < count; ++i)
BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
}
QPID_AUTO_TEST_CASE(testUseSuspendedError)
{
ClientSessionFixture fix;
fix.session.timeout(60);
fix.session.suspend();
try {
fix.session.exchangeQuery(arg::exchange="amq.fanout");
BOOST_FAIL("Expected session suspended exception");
} catch(const NotAttachedException&) {}
}
QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
SimpleListener mylistener;
fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true);
fix.subs.subscribe(mylistener, "myq");
sys::Thread runner(fix.subs);//start dispatcher thread
string data("msg");
Message msg(data, "myq");
const uint count=10;
for (uint i = 0; i < count; ++i) {
fix.session.messageTransfer(arg::content=msg);
}
mylistener.waitFor(count);
fix.subs.cancel("myq");
fix.subs.stop();
runner.join();
fix.session.close();
BOOST_CHECK_EQUAL(mylistener.messages.size(), count);
for (uint j = 0; j < count; ++j) {
BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data);
}
}
QPID_AUTO_TEST_CASE(testLocalQueue) {
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="lq", arg::exclusive=true, arg::autoDelete=true);
LocalQueue lq;
fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false));
fix.session.messageTransfer(arg::content=Message("foo0", "lq"));
fix.session.messageTransfer(arg::content=Message("foo1", "lq"));
fix.session.messageTransfer(arg::content=Message("foo2", "lq"));
BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
BOOST_CHECK(lq.empty()); // Credit exhausted.
fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited());
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}
struct DelayedTransfer : sys::Runnable
{
ClientSessionFixture& fixture;
DelayedTransfer(ClientSessionFixture& f) : fixture(f) {}
void run()
{
qpid::sys::sleep(1);
fixture.session.messageTransfer(arg::content=Message("foo2", "getq"));
}
};
QPID_AUTO_TEST_CASE(testGet) {
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="getq", arg::exclusive=true, arg::autoDelete=true);
fix.session.messageTransfer(arg::content=Message("foo0", "getq"));
fix.session.messageTransfer(arg::content=Message("foo1", "getq"));
Message got;
BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
BOOST_CHECK_EQUAL("foo0", got.getData());
BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
BOOST_CHECK_EQUAL("foo1", got.getData());
BOOST_CHECK(!fix.subs.get(got, "getq"));
DelayedTransfer sender(fix);
Thread t(sender);
//test timed get where message shows up after a short delay
BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC));
BOOST_CHECK_EQUAL("foo2", got.getData());
t.join();
}
QPID_AUTO_TEST_CASE(testOpenFailure) {
BrokerFixture b;
Connection c;
string host("unknowable-host");
try {
c.open(host);
} catch (const Exception&) {
BOOST_CHECK(!c.isOpen());
}
b.open(c);
BOOST_CHECK(c.isOpen());
c.close();
BOOST_CHECK(!c.isOpen());
}
QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
BrokerOptions opts;
opts.queueCleanInterval = 1*TIME_SEC;
opts.queueFlowStopRatio = 0;
opts.queueFlowResumeRatio = 0;
ClientSessionFixture fix(opts);
FieldTable args;
args.setInt("qpid.max_count",10);
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (uint i = 0; i < 10; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
if (i % 2) m.getDeliveryProperties().setTtl(500);
fix.session.messageTransfer(arg::content=m);
}
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
qpid::sys::sleep(2);
BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated
}
QPID_AUTO_TEST_CASE(testExpirationOnPop) {
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
for (uint i = 0; i < 10; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
if (i % 2) m.getDeliveryProperties().setTtl(200);
fix.session.messageTransfer(arg::content=m);
}
qpid::sys::usleep(300* 1000);
for (uint i = 0; i < 10; i++) {
if (i % 2) continue;
Message m;
BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC));
BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
}
}
QPID_AUTO_TEST_CASE(testRelease) {
ClientSessionFixture fix;
const uint count=10;
for (uint i = 0; i < count; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
fix.session.messageTransfer(arg::content=m);
}
fix.subs.setAutoStop(false);
fix.subs.start();
SubscriptionSettings settings;
settings.autoAck = 0;
SimpleListener l1;
Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings);
l1.waitFor(count);
s1.cancel();
for (uint i = 0; i < count; i++) {
BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData());
}
s1.release(s1.getUnaccepted());
//check that released messages are redelivered
settings.autoAck = 1;
SimpleListener l2;
Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings);
l2.waitFor(count);
for (uint i = 0; i < count; i++) {
BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData());
}
fix.subs.stop();
fix.subs.wait();
fix.session.close();
}
QPID_AUTO_TEST_CASE(testCompleteOnAccept) {
ClientSessionFixture fix;
const uint count = 8;
const uint chunk = 4;
for (uint i = 0; i < count; i++) {
Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
fix.session.messageTransfer(arg::content=m);
}
SubscriptionSettings settings;
settings.autoAck = 0;
settings.completionMode = COMPLETE_ON_ACCEPT;
settings.flowControl = FlowControl::messageWindow(chunk);
LocalQueue q;
Subscription s = fix.subs.subscribe(q, "my-queue", settings);
fix.session.messageFlush(arg::destination=s.getName());
SequenceSet accepted;
for (uint i = 0; i < chunk; i++) {
Message m;
BOOST_CHECK(q.get(m));
BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
accepted.add(m.getId());
}
Message m;
BOOST_CHECK(!q.get(m));
s.accept(accepted);
//need to reallocate credit as we have flushed it all out
s.setFlowControl(FlowControl::messageWindow(chunk));
fix.session.messageFlush(arg::destination=s.getName());
accepted.clear();
for (uint i = chunk; i < count; i++) {
Message m;
BOOST_CHECK(q.get(m));
BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
accepted.add(m.getId());
}
fix.session.messageAccept(accepted);
}
namespace
{
struct Publisher : qpid::sys::Runnable
{
AsyncSession session;
Message message;
uint count;
Thread thread;
Publisher(Connection& con, Message m, uint c) : session(con.newSession()), message(m), count(c) {}
void start()
{
thread = Thread(*this);
}
void join()
{
thread.join();
}
void run()
{
for (uint i = 0; i < count; i++) {
session.messageTransfer(arg::content=message);
}
session.sync();
session.close();
}
};
}
QPID_AUTO_TEST_CASE(testConcurrentSenders)
{
//Ensure concurrent publishing sessions on a connection don't
//cause assertions, deadlocks or other undesirables:
BrokerFixture fix;
Connection connection;
ConnectionSettings settings;
settings.maxFrameSize = 1024;
settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
connection.open(settings);
AsyncSession session = connection.newSession();
Message message(string(512, 'X'));
boost::ptr_vector<Publisher> publishers;
for (size_t i = 0; i < 5; i++) {
publishers.push_back(new Publisher(connection, message, 100));
}
std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::start, _1));
std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::join, _1));
connection.close();
}
QPID_AUTO_TEST_CASE(testExclusiveSubscribe)
{
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true);
SubscriptionSettings settings;
settings.exclusive = true;
LocalQueue q;
fix.subs.subscribe(q, "myq", settings, "first");
//attempt to create new subscriber should fail
ScopedSuppressLogging sl;
BOOST_CHECK_THROW(fix.subs.subscribe(q, "myq", "second"), ResourceLockedException);
;
}
QPID_AUTO_TEST_CASE(testExclusiveBinding) {
FieldTable options;
options.setString("qpid.exclusive-binding", "anything");
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="queue-1", arg::exclusive=true, arg::autoDelete=true);
fix.session.queueDeclare(arg::queue="queue-2", arg::exclusive=true, arg::autoDelete=true);
fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-1", arg::bindingKey="my-key", arg::arguments=options);
fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message1", "my-key"));
fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-2", arg::bindingKey="my-key", arg::arguments=options);
fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message2", "my-key"));
Message got;
BOOST_CHECK(fix.subs.get(got, "queue-1"));
BOOST_CHECK_EQUAL("message1", got.getData());
BOOST_CHECK(!fix.subs.get(got, "queue-1"));
BOOST_CHECK(fix.subs.get(got, "queue-2"));
BOOST_CHECK_EQUAL("message2", got.getData());
BOOST_CHECK(!fix.subs.get(got, "queue-2"));
}
QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) {
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, arg::autoDelete=true);
LocalQueue p, q;
fix.subs.subscribe(p, "some-queue");
fix.subs.cancel("some-queue");
fix.subs.subscribe(q, "some-queue");
fix.session.messageTransfer(arg::content=Message("some-data", "some-queue"));
fix.session.messageFlush(arg::destination="some-queue");
Message got;
BOOST_CHECK(!p.get(got));
BOOST_CHECK(q.get(got));
BOOST_CHECK_EQUAL("some-data", got.getData());
BOOST_CHECK(!q.get(got));
}
QPID_AUTO_TEST_CASE(testReliableDispatch) {
ClientSessionFixture fix;
std::string queue("a-queue");
fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true);
ConnectionSettings settings;
settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
Connection c1;
c1.open(settings);
Session s1 = c1.newSession();
SubscriptionManager subs1(s1);
LocalQueue q1;
subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit
Connection c2;
c2.open(settings);
Session s2 = c2.newSession();
SubscriptionManager subs2(s2);
LocalQueue q2;
subs2.subscribe(q2, queue);//second subscriber has credit
fix.session.messageTransfer(arg::content=Message("my-message", queue));
//check that the second consumer gets the message
Message got;
BOOST_CHECK(q2.get(got, 1*TIME_SEC));
BOOST_CHECK_EQUAL("my-message", got.getData());
c1.close();
c2.close();
}
QPID_AUTO_TEST_CASE(testSessionCloseOnInvalidSession) {
Session session;
session.close();
}
QPID_AUTO_TEST_CASE(testLVQVariedSize) {
ClientSessionFixture fix;
std::string queue("my-lvq");
QueueOptions args;
args.setOrdering(LVQ_NO_BROWSE);
fix.session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
std::string key;
args.getLVQKey(key);
for (size_t i = 0; i < 10; i++) {
std::ostringstream data;
size_t size = 100 - ((i % 10) * 10);
data << std::string(size, 'x');
Message m(data.str(), queue);
m.getHeaders().setString(key, "abc");
fix.session.messageTransfer(arg::content=m);
}
}
QPID_AUTO_TEST_CASE(testSessionManagerSetFlowControl) {
ClientSessionFixture fix;
std::string name("dummy");
LocalQueue queue;
SubscriptionSettings settings;
settings.flowControl = FlowControl();
fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true);
fix.subs.subscribe(queue, name, settings);
fix.session.messageTransfer(arg::content=Message("my-message", name));
fix.subs.setFlowControl(name, 1, FlowControl::UNLIMITED, false);
fix.session.messageFlush(name);
Message got;
BOOST_CHECK(queue.get(got, 0));
BOOST_CHECK_EQUAL("my-message", got.getData());
}
QPID_AUTO_TEST_CASE(testGetThenSubscribe) {
ClientSessionFixture fix;
std::string name("myqueue");
fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true);
fix.session.messageTransfer(arg::content=Message("one", name));
fix.session.messageTransfer(arg::content=Message("two", name));
Message got;
BOOST_CHECK(fix.subs.get(got, name));
BOOST_CHECK_EQUAL("one", got.getData());
DummyListener listener(fix.session, name, 1);
listener.run();
BOOST_CHECK_EQUAL(1u, listener.messages.size());
if (!listener.messages.empty()) {
BOOST_CHECK_EQUAL("two", listener.messages[0].getData());
}
}
QPID_AUTO_TEST_CASE(testSessionIsValid) {
ClientSessionFixture fix;
BOOST_CHECK(fix.session.isValid());
Session session;
BOOST_CHECK(!session.isValid());
}
QPID_AUTO_TEST_CASE(testExpirationNotAltered) {
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
Message m("my-message", "my-queue");
m.getDeliveryProperties().setTtl(60000);
m.getDeliveryProperties().setExpiration(12345);
fix.session.messageTransfer(arg::content=m);
Message got;
BOOST_CHECK(fix.subs.get(got, "my-queue"));
BOOST_CHECK_EQUAL("my-message", got.getData());
BOOST_CHECK_EQUAL(12345u, got.getDeliveryProperties().getExpiration());
}
QPID_AUTO_TEST_CASE(testGetConnectionFromSession) {
ClientSessionFixture fix;
FieldTable options;
options.setInt("no-local", 1);
fix.session.queueDeclare(arg::queue="a", arg::exclusive=true, arg::autoDelete=true, arg::arguments=options);
fix.session.queueDeclare(arg::queue="b", arg::exclusive=true, arg::autoDelete=true);
Connection c = fix.session.getConnection();
Session s = c.newSession();
//If this new session was created as expected on the same connection as
//fix.session, then the no-local behaviour means that queue 'a'
//will not enqueue messages from this new session but queue 'b'
//will.
s.messageTransfer(arg::content=Message("a", "a"));
s.messageTransfer(arg::content=Message("b", "b"));
Message got;
BOOST_CHECK(fix.subs.get(got, "b"));
BOOST_CHECK_EQUAL("b", got.getData());
BOOST_CHECK(!fix.subs.get(got, "a"));
}
QPID_AUTO_TEST_CASE(testQueueDeleted)
{
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue");
LocalQueue queue;
fix.subs.subscribe(queue, "my-queue");
ScopedSuppressLogging sl;
fix.session.queueDelete(arg::queue="my-queue");
BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException);
}
QPID_AUTO_TEST_CASE(testTtl)
{
const uint64_t ms = 1000ULL; // convert sec to ms
const uint64_t us = 1000ULL * 1000ULL; // convert sec to us
ClientSessionFixture fix;
fix.session.queueDeclare(arg::queue="ttl-test", arg::exclusive=true, arg::autoDelete=true);
Message msg1 = Message("AAA", "ttl-test");
uint64_t ttl = 2 * ms; // 2 sec
msg1.getDeliveryProperties().setTtl(ttl);
Connection c = fix.session.getConnection();
Session s = c.newSession();
s.messageTransfer(arg::content=msg1);
Message msg2 = Message("BBB", "ttl-test");
ttl = 10 * ms; // 10 sec
msg2.getDeliveryProperties().setTtl(ttl);
s.messageTransfer(arg::content=msg2);
qpid::sys::usleep(5 * us); // 5 sec
// Message "AAA" should be expired and never be delivered
// Check "BBB" has ttl somewhere between 1 and 5 secs
Message got;
BOOST_CHECK(fix.subs.get(got, "ttl-test"));
BOOST_CHECK_EQUAL("BBB", got.getData());
BOOST_CHECK(got.getDeliveryProperties().getTtl() > 1 * ms);
BOOST_CHECK(got.getDeliveryProperties().getTtl() < ttl - (5 * ms));
}
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests