| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "unit_test.h" |
| #include "test_tools.h" |
| #include "BrokerFixture.h" |
| #include "qpid/client/QueueOptions.h" |
| #include "qpid/client/MessageListener.h" |
| #include "qpid/client/SubscriptionManager.h" |
| #include "qpid/client/AsyncSession.h" |
| #include "qpid/sys/Monitor.h" |
| #include "qpid/sys/Thread.h" |
| #include "qpid/sys/Runnable.h" |
| #include "qpid/sys/Time.h" |
| #include "qpid/client/Session.h" |
| #include "qpid/client/Message.h" |
| #include "qpid/framing/reply_exceptions.h" |
| |
| #include <boost/optional.hpp> |
| #include <boost/lexical_cast.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/ptr_container/ptr_vector.hpp> |
| |
| #include <vector> |
| |
| namespace qpid { |
| namespace tests { |
| |
| QPID_AUTO_TEST_SUITE(ClientSessionTest) |
| |
| using namespace qpid::client; |
| using namespace qpid::framing; |
| using namespace qpid; |
| using qpid::sys::Monitor; |
| using qpid::sys::Thread; |
| using qpid::sys::TIME_SEC; |
| using qpid::broker::Broker; |
| using std::string; |
| using std::cout; |
| using std::endl; |
| |
| |
| struct DummyListener : public sys::Runnable, public MessageListener { |
| std::vector<Message> messages; |
| string name; |
| uint expected; |
| SubscriptionManager submgr; |
| |
| DummyListener(Session& session, const string& n, uint ex) : |
| name(n), expected(ex), submgr(session) {} |
| |
| void run() |
| { |
| submgr.subscribe(*this, name); |
| submgr.run(); |
| } |
| |
| void received(Message& msg) |
| { |
| messages.push_back(msg); |
| if (--expected == 0) { |
| submgr.stop(); |
| } |
| } |
| }; |
| |
| struct SimpleListener : public MessageListener |
| { |
| Monitor lock; |
| std::vector<Message> messages; |
| |
| void received(Message& msg) |
| { |
| Monitor::ScopedLock l(lock); |
| messages.push_back(msg); |
| lock.notifyAll(); |
| } |
| |
| void waitFor(const uint n) |
| { |
| Monitor::ScopedLock l(lock); |
| while (messages.size() < n) { |
| lock.wait(); |
| } |
| } |
| }; |
| |
| struct ClientSessionFixture : public SessionFixture |
| { |
| ClientSessionFixture(Broker::Options opts = Broker::Options()) : SessionFixture(opts) { |
| session.queueDeclare(arg::queue="my-queue"); |
| } |
| }; |
| |
| QPID_AUTO_TEST_CASE(testQueueQuery) { |
| ClientSessionFixture fix; |
| fix.session = fix.connection.newSession(); |
| fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout", |
| arg::exclusive=true, arg::autoDelete=true); |
| QueueQueryResult result = fix.session.queueQuery("q"); |
| BOOST_CHECK_EQUAL(false, result.getDurable()); |
| BOOST_CHECK_EQUAL(true, result.getExclusive()); |
| BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testDispatcher) |
| { |
| ClientSessionFixture fix; |
| fix.session =fix.connection.newSession(); |
| size_t count = 100; |
| for (size_t i = 0; i < count; ++i) |
| fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); |
| DummyListener listener(fix.session, "my-queue", count); |
| listener.run(); |
| BOOST_CHECK_EQUAL(count, listener.messages.size()); |
| for (size_t i = 0; i < count; ++i) |
| BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testDispatcherThread) |
| { |
| ClientSessionFixture fix; |
| fix.session =fix.connection.newSession(); |
| size_t count = 10; |
| DummyListener listener(fix.session, "my-queue", count); |
| sys::Thread t(listener); |
| for (size_t i = 0; i < count; ++i) { |
| fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); |
| } |
| t.join(); |
| BOOST_CHECK_EQUAL(count, listener.messages.size()); |
| for (size_t i = 0; i < count; ++i) |
| BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testUseSuspendedError) |
| { |
| ClientSessionFixture fix; |
| fix.session.timeout(60); |
| fix.session.suspend(); |
| try { |
| fix.session.exchangeQuery(arg::exchange="amq.fanout"); |
| BOOST_FAIL("Expected session suspended exception"); |
| } catch(const NotAttachedException&) {} |
| } |
| |
| QPID_AUTO_TEST_CASE(testSendToSelf) { |
| ClientSessionFixture fix; |
| SimpleListener mylistener; |
| fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true); |
| fix.subs.subscribe(mylistener, "myq"); |
| sys::Thread runner(fix.subs);//start dispatcher thread |
| string data("msg"); |
| Message msg(data, "myq"); |
| const uint count=10; |
| for (uint i = 0; i < count; ++i) { |
| fix.session.messageTransfer(arg::content=msg); |
| } |
| mylistener.waitFor(count); |
| fix.subs.cancel("myq"); |
| fix.subs.stop(); |
| runner.join(); |
| fix.session.close(); |
| BOOST_CHECK_EQUAL(mylistener.messages.size(), count); |
| for (uint j = 0; j < count; ++j) { |
| BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data); |
| } |
| } |
| |
| QPID_AUTO_TEST_CASE(testLocalQueue) { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="lq", arg::exclusive=true, arg::autoDelete=true); |
| LocalQueue lq; |
| fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false)); |
| fix.session.messageTransfer(arg::content=Message("foo0", "lq")); |
| fix.session.messageTransfer(arg::content=Message("foo1", "lq")); |
| fix.session.messageTransfer(arg::content=Message("foo2", "lq")); |
| BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); |
| BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); |
| BOOST_CHECK(lq.empty()); // Credit exhausted. |
| fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited()); |
| BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); |
| } |
| |
| struct DelayedTransfer : sys::Runnable |
| { |
| ClientSessionFixture& fixture; |
| |
| DelayedTransfer(ClientSessionFixture& f) : fixture(f) {} |
| |
| void run() |
| { |
| qpid::sys::sleep(1); |
| fixture.session.messageTransfer(arg::content=Message("foo2", "getq")); |
| } |
| }; |
| |
| QPID_AUTO_TEST_CASE(testGet) { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="getq", arg::exclusive=true, arg::autoDelete=true); |
| fix.session.messageTransfer(arg::content=Message("foo0", "getq")); |
| fix.session.messageTransfer(arg::content=Message("foo1", "getq")); |
| Message got; |
| BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); |
| BOOST_CHECK_EQUAL("foo0", got.getData()); |
| BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); |
| BOOST_CHECK_EQUAL("foo1", got.getData()); |
| BOOST_CHECK(!fix.subs.get(got, "getq")); |
| DelayedTransfer sender(fix); |
| Thread t(sender); |
| //test timed get where message shows up after a short delay |
| BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC)); |
| BOOST_CHECK_EQUAL("foo2", got.getData()); |
| t.join(); |
| } |
| |
| QPID_AUTO_TEST_CASE(testOpenFailure) { |
| BrokerFixture b; |
| Connection c; |
| string host("unknowable-host"); |
| try { |
| c.open(host); |
| } catch (const Exception&) { |
| BOOST_CHECK(!c.isOpen()); |
| } |
| b.open(c); |
| BOOST_CHECK(c.isOpen()); |
| c.close(); |
| BOOST_CHECK(!c.isOpen()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testPeriodicExpiration) { |
| Broker::Options opts; |
| opts.queueCleanInterval = 1; |
| opts.queueFlowStopRatio = 0; |
| opts.queueFlowResumeRatio = 0; |
| ClientSessionFixture fix(opts); |
| FieldTable args; |
| args.setInt("qpid.max_count",10); |
| fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); |
| |
| for (uint i = 0; i < 10; i++) { |
| Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); |
| if (i % 2) m.getDeliveryProperties().setTtl(500); |
| fix.session.messageTransfer(arg::content=m); |
| } |
| |
| BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); |
| qpid::sys::sleep(2); |
| BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); |
| fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated |
| } |
| |
| QPID_AUTO_TEST_CASE(testExpirationOnPop) { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); |
| |
| for (uint i = 0; i < 10; i++) { |
| Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); |
| if (i % 2) m.getDeliveryProperties().setTtl(200); |
| fix.session.messageTransfer(arg::content=m); |
| } |
| |
| qpid::sys::usleep(300* 1000); |
| |
| for (uint i = 0; i < 10; i++) { |
| if (i % 2) continue; |
| Message m; |
| BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC)); |
| BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); |
| } |
| } |
| |
| QPID_AUTO_TEST_CASE(testRelease) { |
| ClientSessionFixture fix; |
| |
| const uint count=10; |
| for (uint i = 0; i < count; i++) { |
| Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); |
| fix.session.messageTransfer(arg::content=m); |
| } |
| |
| fix.subs.setAutoStop(false); |
| fix.subs.start(); |
| SubscriptionSettings settings; |
| settings.autoAck = 0; |
| |
| SimpleListener l1; |
| Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings); |
| l1.waitFor(count); |
| s1.cancel(); |
| |
| for (uint i = 0; i < count; i++) { |
| BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData()); |
| } |
| s1.release(s1.getUnaccepted()); |
| |
| //check that released messages are redelivered |
| settings.autoAck = 1; |
| SimpleListener l2; |
| Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings); |
| l2.waitFor(count); |
| for (uint i = 0; i < count; i++) { |
| BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData()); |
| } |
| |
| fix.subs.stop(); |
| fix.subs.wait(); |
| fix.session.close(); |
| } |
| |
| QPID_AUTO_TEST_CASE(testCompleteOnAccept) { |
| ClientSessionFixture fix; |
| const uint count = 8; |
| const uint chunk = 4; |
| for (uint i = 0; i < count; i++) { |
| Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); |
| fix.session.messageTransfer(arg::content=m); |
| } |
| |
| SubscriptionSettings settings; |
| settings.autoAck = 0; |
| settings.completionMode = COMPLETE_ON_ACCEPT; |
| settings.flowControl = FlowControl::messageWindow(chunk); |
| |
| LocalQueue q; |
| Subscription s = fix.subs.subscribe(q, "my-queue", settings); |
| fix.session.messageFlush(arg::destination=s.getName()); |
| SequenceSet accepted; |
| for (uint i = 0; i < chunk; i++) { |
| Message m; |
| BOOST_CHECK(q.get(m)); |
| BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); |
| accepted.add(m.getId()); |
| } |
| Message m; |
| BOOST_CHECK(!q.get(m)); |
| |
| s.accept(accepted); |
| fix.session.messageFlush(arg::destination=s.getName()); |
| accepted.clear(); |
| |
| for (uint i = chunk; i < count; i++) { |
| Message m; |
| BOOST_CHECK(q.get(m)); |
| BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); |
| accepted.add(m.getId()); |
| } |
| fix.session.messageAccept(accepted); |
| } |
| |
| namespace |
| { |
| struct Publisher : qpid::sys::Runnable |
| { |
| AsyncSession session; |
| Message message; |
| uint count; |
| Thread thread; |
| |
| Publisher(Connection& con, Message m, uint c) : session(con.newSession()), message(m), count(c) {} |
| |
| void start() |
| { |
| thread = Thread(*this); |
| } |
| |
| void join() |
| { |
| thread.join(); |
| } |
| |
| void run() |
| { |
| for (uint i = 0; i < count; i++) { |
| session.messageTransfer(arg::content=message); |
| } |
| session.sync(); |
| session.close(); |
| } |
| }; |
| } |
| |
| QPID_AUTO_TEST_CASE(testConcurrentSenders) |
| { |
| //Ensure concurrent publishing sessions on a connection don't |
| //cause assertions, deadlocks or other undesirables: |
| BrokerFixture fix; |
| Connection connection; |
| ConnectionSettings settings; |
| settings.maxFrameSize = 1024; |
| settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); |
| connection.open(settings); |
| AsyncSession session = connection.newSession(); |
| Message message(string(512, 'X')); |
| |
| boost::ptr_vector<Publisher> publishers; |
| for (size_t i = 0; i < 5; i++) { |
| publishers.push_back(new Publisher(connection, message, 100)); |
| } |
| std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::start, _1)); |
| std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::join, _1)); |
| connection.close(); |
| } |
| |
| |
| QPID_AUTO_TEST_CASE(testExclusiveSubscribe) |
| { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true); |
| SubscriptionSettings settings; |
| settings.exclusive = true; |
| LocalQueue q; |
| fix.subs.subscribe(q, "myq", settings, "first"); |
| //attempt to create new subscriber should fail |
| ScopedSuppressLogging sl; |
| BOOST_CHECK_THROW(fix.subs.subscribe(q, "myq", "second"), ResourceLockedException); |
| ; |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testExclusiveBinding) { |
| FieldTable options; |
| options.setString("qpid.exclusive-binding", "anything"); |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="queue-1", arg::exclusive=true, arg::autoDelete=true); |
| fix.session.queueDeclare(arg::queue="queue-2", arg::exclusive=true, arg::autoDelete=true); |
| fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-1", arg::bindingKey="my-key", arg::arguments=options); |
| fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message1", "my-key")); |
| fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-2", arg::bindingKey="my-key", arg::arguments=options); |
| fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message2", "my-key")); |
| |
| Message got; |
| BOOST_CHECK(fix.subs.get(got, "queue-1")); |
| BOOST_CHECK_EQUAL("message1", got.getData()); |
| BOOST_CHECK(!fix.subs.get(got, "queue-1")); |
| |
| BOOST_CHECK(fix.subs.get(got, "queue-2")); |
| BOOST_CHECK_EQUAL("message2", got.getData()); |
| BOOST_CHECK(!fix.subs.get(got, "queue-2")); |
| } |
| |
| QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, arg::autoDelete=true); |
| LocalQueue p, q; |
| fix.subs.subscribe(p, "some-queue"); |
| fix.subs.cancel("some-queue"); |
| fix.subs.subscribe(q, "some-queue"); |
| |
| fix.session.messageTransfer(arg::content=Message("some-data", "some-queue")); |
| fix.session.messageFlush(arg::destination="some-queue"); |
| |
| Message got; |
| BOOST_CHECK(!p.get(got)); |
| |
| BOOST_CHECK(q.get(got)); |
| BOOST_CHECK_EQUAL("some-data", got.getData()); |
| BOOST_CHECK(!q.get(got)); |
| } |
| |
| QPID_AUTO_TEST_CASE(testReliableDispatch) { |
| ClientSessionFixture fix; |
| std::string queue("a-queue"); |
| fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true); |
| |
| ConnectionSettings settings; |
| settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); |
| |
| Connection c1; |
| c1.open(settings); |
| Session s1 = c1.newSession(); |
| SubscriptionManager subs1(s1); |
| LocalQueue q1; |
| subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit |
| |
| Connection c2; |
| c2.open(settings); |
| Session s2 = c2.newSession(); |
| SubscriptionManager subs2(s2); |
| LocalQueue q2; |
| subs2.subscribe(q2, queue);//second subscriber has credit |
| |
| fix.session.messageTransfer(arg::content=Message("my-message", queue)); |
| |
| //check that the second consumer gets the message |
| Message got; |
| BOOST_CHECK(q2.get(got, 1*TIME_SEC)); |
| BOOST_CHECK_EQUAL("my-message", got.getData()); |
| |
| c1.close(); |
| c2.close(); |
| } |
| |
| QPID_AUTO_TEST_CASE(testSessionCloseOnInvalidSession) { |
| Session session; |
| session.close(); |
| } |
| |
| QPID_AUTO_TEST_CASE(testLVQVariedSize) { |
| ClientSessionFixture fix; |
| std::string queue("my-lvq"); |
| QueueOptions args; |
| args.setOrdering(LVQ_NO_BROWSE); |
| fix.session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); |
| |
| std::string key; |
| args.getLVQKey(key); |
| |
| for (size_t i = 0; i < 10; i++) { |
| std::ostringstream data; |
| size_t size = 100 - ((i % 10) * 10); |
| data << std::string(size, 'x'); |
| |
| Message m(data.str(), queue); |
| m.getHeaders().setString(key, "abc"); |
| fix.session.messageTransfer(arg::content=m); |
| } |
| } |
| |
| QPID_AUTO_TEST_CASE(testSessionManagerSetFlowControl) { |
| ClientSessionFixture fix; |
| std::string name("dummy"); |
| LocalQueue queue; |
| SubscriptionSettings settings; |
| settings.flowControl = FlowControl(); |
| fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); |
| fix.subs.subscribe(queue, name, settings); |
| fix.session.messageTransfer(arg::content=Message("my-message", name)); |
| fix.subs.setFlowControl(name, 1, FlowControl::UNLIMITED, false); |
| fix.session.messageFlush(name); |
| Message got; |
| BOOST_CHECK(queue.get(got, 0)); |
| BOOST_CHECK_EQUAL("my-message", got.getData()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testGetThenSubscribe) { |
| ClientSessionFixture fix; |
| std::string name("myqueue"); |
| fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); |
| fix.session.messageTransfer(arg::content=Message("one", name)); |
| fix.session.messageTransfer(arg::content=Message("two", name)); |
| Message got; |
| BOOST_CHECK(fix.subs.get(got, name)); |
| BOOST_CHECK_EQUAL("one", got.getData()); |
| |
| DummyListener listener(fix.session, name, 1); |
| listener.run(); |
| BOOST_CHECK_EQUAL(1u, listener.messages.size()); |
| if (!listener.messages.empty()) { |
| BOOST_CHECK_EQUAL("two", listener.messages[0].getData()); |
| } |
| } |
| |
| QPID_AUTO_TEST_CASE(testSessionIsValid) { |
| ClientSessionFixture fix; |
| BOOST_CHECK(fix.session.isValid()); |
| Session session; |
| BOOST_CHECK(!session.isValid()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testExpirationNotAltered) { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); |
| |
| Message m("my-message", "my-queue"); |
| m.getDeliveryProperties().setTtl(60000); |
| m.getDeliveryProperties().setExpiration(12345); |
| fix.session.messageTransfer(arg::content=m); |
| Message got; |
| BOOST_CHECK(fix.subs.get(got, "my-queue")); |
| BOOST_CHECK_EQUAL("my-message", got.getData()); |
| BOOST_CHECK_EQUAL(12345u, got.getDeliveryProperties().getExpiration()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testGetConnectionFromSession) { |
| ClientSessionFixture fix; |
| FieldTable options; |
| options.setInt("no-local", 1); |
| fix.session.queueDeclare(arg::queue="a", arg::exclusive=true, arg::autoDelete=true, arg::arguments=options); |
| fix.session.queueDeclare(arg::queue="b", arg::exclusive=true, arg::autoDelete=true); |
| |
| Connection c = fix.session.getConnection(); |
| Session s = c.newSession(); |
| //If this new session was created as expected on the same connection as |
| //fix.session, then the no-local behaviour means that queue 'a' |
| //will not enqueue messages from this new session but queue 'b' |
| //will. |
| s.messageTransfer(arg::content=Message("a", "a")); |
| s.messageTransfer(arg::content=Message("b", "b")); |
| |
| Message got; |
| BOOST_CHECK(fix.subs.get(got, "b")); |
| BOOST_CHECK_EQUAL("b", got.getData()); |
| BOOST_CHECK(!fix.subs.get(got, "a")); |
| } |
| |
| |
| QPID_AUTO_TEST_CASE(testQueueDeleted) |
| { |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="my-queue"); |
| LocalQueue queue; |
| fix.subs.subscribe(queue, "my-queue"); |
| |
| ScopedSuppressLogging sl; |
| fix.session.queueDelete(arg::queue="my-queue"); |
| BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException); |
| } |
| |
| QPID_AUTO_TEST_CASE(testTtl) |
| { |
| const uint64_t ms = 1000ULL; // convert sec to ms |
| const uint64_t us = 1000ULL * 1000ULL; // convert sec to us |
| |
| ClientSessionFixture fix; |
| fix.session.queueDeclare(arg::queue="ttl-test", arg::exclusive=true, arg::autoDelete=true); |
| Message msg1 = Message("AAA", "ttl-test"); |
| uint64_t ttl = 2 * ms; // 2 sec |
| msg1.getDeliveryProperties().setTtl(ttl); |
| Connection c = fix.session.getConnection(); |
| Session s = c.newSession(); |
| s.messageTransfer(arg::content=msg1); |
| |
| Message msg2 = Message("BBB", "ttl-test"); |
| ttl = 10 * ms; // 10 sec |
| msg2.getDeliveryProperties().setTtl(ttl); |
| s.messageTransfer(arg::content=msg2); |
| |
| qpid::sys::usleep(5 * us); // 5 sec |
| |
| // Message "AAA" should be expired and never be delivered |
| // Check "BBB" has ttl somewhere between 1 and 5 secs |
| Message got; |
| BOOST_CHECK(fix.subs.get(got, "ttl-test")); |
| BOOST_CHECK_EQUAL("BBB", got.getData()); |
| BOOST_CHECK(got.getDeliveryProperties().getTtl() > 1 * ms); |
| BOOST_CHECK(got.getDeliveryProperties().getTtl() < ttl - (5 * ms)); |
| } |
| |
| QPID_AUTO_TEST_SUITE_END() |
| |
| }} // namespace qpid::tests |