| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "MessageUtils.h" |
| #include "unit_test.h" |
| #include "test_tools.h" |
| #include "qpid/Exception.h" |
| #include "qpid/broker/Broker.h" |
| #include "qpid/broker/DeliverableMessage.h" |
| #include "qpid/broker/FanOutExchange.h" |
| #include "qpid/broker/Queue.h" |
| #include "qpid/broker/Deliverable.h" |
| #include "qpid/broker/ExchangeRegistry.h" |
| #include "qpid/broker/QueueRegistry.h" |
| #include "qpid/broker/NullMessageStore.h" |
| #include "qpid/broker/ExpiryPolicy.h" |
| #include "qpid/framing/MessageTransferBody.h" |
| #include "qpid/client/QueueOptions.h" |
| #include "qpid/framing/AMQFrame.h" |
| #include "qpid/framing/MessageTransferBody.h" |
| #include "qpid/framing/reply_exceptions.h" |
| #include "qpid/broker/QueuePolicy.h" |
| #include "qpid/broker/QueueFlowLimit.h" |
| |
| #include <iostream> |
| #include "boost/format.hpp" |
| |
| using boost::intrusive_ptr; |
| using namespace qpid; |
| using namespace qpid::broker; |
| using namespace qpid::client; |
| using namespace qpid::framing; |
| using namespace qpid::sys; |
| |
| namespace qpid { |
| namespace tests { |
| |
| class TestConsumer : public virtual Consumer{ |
| public: |
| typedef boost::shared_ptr<TestConsumer> shared_ptr; |
| |
| QueuedMessage last; |
| bool received; |
| TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; |
| |
| virtual bool deliver(QueuedMessage& msg){ |
| last = msg; |
| received = true; |
| return true; |
| }; |
| void notify() {} |
| OwnershipToken* getSession() { return 0; } |
| }; |
| |
| class FailOnDeliver : public Deliverable |
| { |
| boost::intrusive_ptr<Message> msg; |
| public: |
| FailOnDeliver() : msg(MessageUtils::createMessage()) {} |
| void deliverTo(const boost::shared_ptr<Queue>& queue) |
| { |
| throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); |
| } |
| Message& getMessage() { return *(msg.get()); } |
| }; |
| |
| intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { |
| intrusive_ptr<Message> msg(new Message()); |
| AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); |
| AMQFrame header((AMQHeaderBody())); |
| msg->getFrames().append(method); |
| msg->getFrames().append(header); |
| msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); |
| if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl); |
| return msg; |
| } |
| |
| QPID_AUTO_TEST_SUITE(QueueTestSuite) |
| |
| QPID_AUTO_TEST_CASE(testAsyncMessage) { |
| Queue::shared_ptr queue(new Queue("my_test_queue", true)); |
| intrusive_ptr<Message> received; |
| |
| TestConsumer::shared_ptr c1(new TestConsumer()); |
| queue->consume(c1); |
| |
| |
| //Test basic delivery: |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process |
| queue->process(msg1); |
| sleep(2); |
| |
| BOOST_CHECK(!c1->received); |
| msg1->enqueueComplete(); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg1.get(), received.get()); |
| } |
| |
| |
| QPID_AUTO_TEST_CASE(testAsyncMessageCount){ |
| Queue::shared_ptr queue(new Queue("my_test_queue", true)); |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process |
| |
| queue->process(msg1); |
| sleep(2); |
| uint32_t compval=0; |
| BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); |
| msg1->enqueueComplete(); |
| compval=1; |
| BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); |
| BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testConsumers){ |
| Queue::shared_ptr queue(new Queue("my_queue", true)); |
| |
| //Test adding consumers: |
| TestConsumer::shared_ptr c1(new TestConsumer()); |
| TestConsumer::shared_ptr c2(new TestConsumer()); |
| queue->consume(c1); |
| queue->consume(c2); |
| |
| BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); |
| |
| //Test basic delivery: |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| |
| queue->deliver(msg1); |
| BOOST_CHECK(queue->dispatch(c1)); |
| BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); |
| |
| queue->deliver(msg2); |
| BOOST_CHECK(queue->dispatch(c2)); |
| BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); |
| |
| c1->received = false; |
| queue->deliver(msg3); |
| BOOST_CHECK(queue->dispatch(c1)); |
| BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); |
| |
| //Test cancellation: |
| queue->cancel(c1); |
| BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount()); |
| queue->cancel(c2); |
| BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount()); |
| } |
| |
| QPID_AUTO_TEST_CASE(testRegistry){ |
| //Test use of queues in registry: |
| QueueRegistry registry; |
| registry.declare("queue1", true, true); |
| registry.declare("queue2", true, true); |
| registry.declare("queue3", true, true); |
| |
| BOOST_CHECK(registry.find("queue1")); |
| BOOST_CHECK(registry.find("queue2")); |
| BOOST_CHECK(registry.find("queue3")); |
| |
| registry.destroy("queue1"); |
| registry.destroy("queue2"); |
| registry.destroy("queue3"); |
| |
| BOOST_CHECK(!registry.find("queue1")); |
| BOOST_CHECK(!registry.find("queue2")); |
| BOOST_CHECK(!registry.find("queue3")); |
| } |
| |
| QPID_AUTO_TEST_CASE(testDequeue){ |
| Queue::shared_ptr queue(new Queue("my_queue", true)); |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| intrusive_ptr<Message> received; |
| |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| queue->deliver(msg3); |
| |
| BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg1.get(), received.get()); |
| BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg2.get(), received.get()); |
| BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); |
| |
| TestConsumer::shared_ptr consumer(new TestConsumer()); |
| queue->consume(consumer); |
| queue->dispatch(consumer); |
| if (!consumer->received) |
| sleep(2); |
| |
| BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); |
| BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK(!received); |
| BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testBound){ |
| //test the recording of bindings, and use of those to allow a queue to be unbound |
| string key("my-key"); |
| FieldTable args; |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true)); |
| ExchangeRegistry exchanges; |
| //establish bindings from exchange->queue and notify the queue as it is bound: |
| Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; |
| exchange1->bind(queue, key, &args); |
| queue->bound(exchange1->getName(), key, args); |
| |
| Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; |
| exchange2->bind(queue, key, &args); |
| queue->bound(exchange2->getName(), key, args); |
| |
| Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; |
| exchange3->bind(queue, key, &args); |
| queue->bound(exchange3->getName(), key, args); |
| |
| //delete one of the exchanges: |
| exchanges.destroy(exchange2->getName()); |
| exchange2.reset(); |
| |
| //unbind the queue from all exchanges it knows it has been bound to: |
| queue->unbind(exchanges); |
| |
| //ensure the remaining exchanges don't still have the queue bound to them: |
| FailOnDeliver deliverable; |
| exchange1->route(deliverable, key, &args); |
| exchange3->route(deliverable, key, &args); |
| } |
| |
| QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ |
| client::QueueOptions args; |
| args.setPersistLastNode(); |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true)); |
| queue->configure(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| |
| //enqueue 2 messages |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| |
| //change mode |
| queue->setLastNodeFailure(); |
| |
| //enqueue 1 message |
| queue->deliver(msg3); |
| |
| //check all have persistent ids. |
| BOOST_CHECK(msg1->isPersistent()); |
| BOOST_CHECK(msg2->isPersistent()); |
| BOOST_CHECK(msg3->isPersistent()); |
| |
| } |
| |
| |
| QPID_AUTO_TEST_CASE(testSeek){ |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true)); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| |
| //enqueue 2 messages |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| queue->deliver(msg3); |
| |
| TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); |
| SequenceNumber seq(2); |
| consumer->position = seq; |
| |
| QueuedMessage qm; |
| queue->dispatch(consumer); |
| |
| BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); |
| queue->dispatch(consumer); |
| queue->dispatch(consumer); // make sure over-run is safe |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testSearch){ |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true)); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| |
| //enqueue 2 messages |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| queue->deliver(msg3); |
| |
| SequenceNumber seq(2); |
| QueuedMessage qm; |
| TestConsumer::shared_ptr c1(new TestConsumer()); |
| |
| BOOST_CHECK(queue->find(seq, qm)); |
| |
| BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); |
| |
| queue->acquire(qm, c1->getName()); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); |
| SequenceNumber seq1(3); |
| QueuedMessage qm1; |
| BOOST_CHECK(queue->find(seq1, qm1)); |
| BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); |
| |
| } |
| const std::string nullxid = ""; |
| |
| class SimpleDummyCtxt : public TransactionContext {}; |
| |
| class DummyCtxt : public TPCTransactionContext |
| { |
| const std::string xid; |
| public: |
| DummyCtxt(const std::string& _xid) : xid(_xid) {} |
| static std::string getXid(TransactionContext& ctxt) |
| { |
| DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); |
| return c ? c->xid : nullxid; |
| } |
| }; |
| |
| class TestMessageStoreOC : public MessageStore |
| { |
| std::set<std::string> prepared; |
| uint64_t nextPersistenceId; |
| public: |
| |
| uint enqCnt; |
| uint deqCnt; |
| bool error; |
| |
| TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} |
| ~TestMessageStoreOC(){} |
| |
| virtual void dequeue(TransactionContext*, |
| const boost::intrusive_ptr<PersistableMessage>& /*msg*/, |
| const PersistableQueue& /*queue*/) |
| { |
| if (error) throw Exception("Dequeue error test"); |
| deqCnt++; |
| } |
| |
| virtual void enqueue(TransactionContext*, |
| const boost::intrusive_ptr<PersistableMessage>& msg, |
| const PersistableQueue& /* queue */) |
| { |
| if (error) throw Exception("Enqueue error test"); |
| enqCnt++; |
| msg->enqueueComplete(); |
| } |
| |
| void createError() |
| { |
| error=true; |
| } |
| |
| bool init(const Options*) { return true; } |
| void truncateInit(const bool) {} |
| void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } |
| void destroy(PersistableQueue&) {} |
| void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } |
| void destroy(const PersistableExchange&) {} |
| void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} |
| void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} |
| void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } |
| void destroy(const PersistableConfig&) {} |
| void stage(const boost::intrusive_ptr<PersistableMessage>&) {} |
| void destroy(PersistableMessage&) {} |
| void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {} |
| void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&, |
| std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } |
| void flush(const qpid::broker::PersistableQueue&) {} |
| uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } |
| |
| std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); } |
| std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } |
| void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } |
| void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } |
| void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } |
| void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } |
| |
| void recover(RecoveryManager&) {} |
| }; |
| |
| |
| QPID_AUTO_TEST_CASE(testLVQOrdering){ |
| |
| client::QueueOptions args; |
| // set queue mode |
| args.setOrdering(client::LVQ); |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true )); |
| queue->configure(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| intrusive_ptr<Message> msg4 = create_message("e", "D"); |
| intrusive_ptr<Message> received; |
| |
| //set deliever match for LVQ a,b,c,a |
| |
| string key; |
| args.getLVQKey(key); |
| BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); |
| |
| |
| msg1->insertCustomProperty(key,"a"); |
| msg2->insertCustomProperty(key,"b"); |
| msg3->insertCustomProperty(key,"c"); |
| msg4->insertCustomProperty(key,"a"); |
| |
| //enqueue 4 message |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| queue->deliver(msg3); |
| queue->deliver(msg4); |
| |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg4.get(), received.get()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg2.get(), received.get()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg3.get(), received.get()); |
| |
| intrusive_ptr<Message> msg5 = create_message("e", "A"); |
| intrusive_ptr<Message> msg6 = create_message("e", "B"); |
| intrusive_ptr<Message> msg7 = create_message("e", "C"); |
| msg5->insertCustomProperty(key,"a"); |
| msg6->insertCustomProperty(key,"b"); |
| msg7->insertCustomProperty(key,"c"); |
| queue->deliver(msg5); |
| queue->deliver(msg6); |
| queue->deliver(msg7); |
| |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg5.get(), received.get()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg6.get(), received.get()); |
| |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg7.get(), received.get()); |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testLVQEmptyKey){ |
| |
| client::QueueOptions args; |
| // set queue mode |
| args.setOrdering(client::LVQ); |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true )); |
| queue->configure(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| |
| string key; |
| args.getLVQKey(key); |
| BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); |
| |
| |
| msg1->insertCustomProperty(key,"a"); |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testLVQAcquire){ |
| |
| client::QueueOptions args; |
| // set queue mode |
| args.setOrdering(client::LVQ); |
| // disable flow control, as this test violates the enqueue/dequeue sequence. |
| args.setInt(QueueFlowLimit::flowStopCountKey, 0); |
| |
| Queue::shared_ptr queue(new Queue("my-queue", true )); |
| queue->configure(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| intrusive_ptr<Message> msg3 = create_message("e", "C"); |
| intrusive_ptr<Message> msg4 = create_message("e", "D"); |
| intrusive_ptr<Message> msg5 = create_message("e", "F"); |
| intrusive_ptr<Message> msg6 = create_message("e", "G"); |
| |
| //set deliever match for LVQ a,b,c,a |
| |
| string key; |
| args.getLVQKey(key); |
| BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); |
| |
| |
| msg1->insertCustomProperty(key,"a"); |
| msg2->insertCustomProperty(key,"b"); |
| msg3->insertCustomProperty(key,"c"); |
| msg4->insertCustomProperty(key,"a"); |
| msg5->insertCustomProperty(key,"b"); |
| msg6->insertCustomProperty(key,"c"); |
| |
| //enqueue 4 message |
| queue->deliver(msg1); |
| queue->deliver(msg2); |
| queue->deliver(msg3); |
| queue->deliver(msg4); |
| |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); |
| |
| framing::SequenceNumber sequence(1); |
| QueuedMessage qmsg(queue.get(), msg1, sequence); |
| QueuedMessage qmsg2(queue.get(), msg2, ++sequence); |
| framing::SequenceNumber sequence1(10); |
| QueuedMessage qmsg3(queue.get(), 0, sequence1); |
| TestConsumer::shared_ptr dummy(new TestConsumer()); |
| |
| BOOST_CHECK(!queue->acquire(qmsg, dummy->getName())); |
| BOOST_CHECK(queue->acquire(qmsg2, dummy->getName())); |
| // Acquire the massage again to test failure case. |
| BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName())); |
| BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName())); |
| |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); |
| |
| queue->deliver(msg5); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); |
| |
| // set mode to no browse and check |
| args.setOrdering(client::LVQ_NO_BROWSE); |
| queue->configure(args); |
| TestConsumer::shared_ptr c1(new TestConsumer("test", false)); |
| |
| queue->dispatch(c1); |
| queue->dispatch(c1); |
| queue->dispatch(c1); |
| |
| queue->deliver(msg6); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); |
| |
| intrusive_ptr<Message> received; |
| received = queue->get().payload; |
| BOOST_CHECK_EQUAL(msg4.get(), received.get()); |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testLVQMultiQueue){ |
| |
| client::QueueOptions args; |
| // set queue mode |
| args.setOrdering(client::LVQ); |
| |
| Queue::shared_ptr queue1(new Queue("my-queue", true )); |
| Queue::shared_ptr queue2(new Queue("my-queue", true )); |
| intrusive_ptr<Message> received; |
| queue1->configure(args); |
| queue2->configure(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "A"); |
| |
| string key; |
| args.getLVQKey(key); |
| BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); |
| |
| msg1->insertCustomProperty(key,"a"); |
| msg2->insertCustomProperty(key,"a"); |
| |
| queue1->deliver(msg1); |
| queue2->deliver(msg1); |
| queue1->deliver(msg2); |
| |
| received = queue1->get().payload; |
| BOOST_CHECK_EQUAL(msg2.get(), received.get()); |
| |
| received = queue2->get().payload; |
| BOOST_CHECK_EQUAL(msg1.get(), received.get()); |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testLVQRecover){ |
| |
| /* simulate this |
| 1. start 2 nodes |
| 2. create cluster durable lvq |
| 3. send a transient message to the queue |
| 4. kill one of the nodes (to trigger force persistent behaviour)... |
| 5. then restart it (to turn off force persistent behaviour) |
| 6. send another transient message with same lvq key as in 3 |
| 7. kill the second node again (retrigger force persistent) |
| 8. stop and recover the first node |
| */ |
| TestMessageStoreOC testStore; |
| client::QueueOptions args; |
| // set queue mode |
| args.setOrdering(client::LVQ); |
| args.setPersistLastNode(); |
| |
| Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); |
| intrusive_ptr<Message> received; |
| queue1->create(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| intrusive_ptr<Message> msg2 = create_message("e", "A"); |
| // 2 |
| string key; |
| args.getLVQKey(key); |
| BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); |
| |
| msg1->insertCustomProperty(key,"a"); |
| msg2->insertCustomProperty(key,"a"); |
| // 3 |
| queue1->deliver(msg1); |
| // 4 |
| queue1->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); |
| // 5 |
| queue1->clearLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); |
| // 6 |
| queue1->deliver(msg2); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); |
| queue1->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); |
| BOOST_CHECK_EQUAL(testStore.deqCnt, 1u); |
| } |
| |
| void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) |
| { |
| for (uint i = 0; i < count; i++) { |
| intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); |
| m->computeExpiration(new broker::ExpiryPolicy); |
| queue.deliver(m); |
| } |
| } |
| |
| QPID_AUTO_TEST_CASE(testPurgeExpired) { |
| Queue queue("my-queue"); |
| addMessagesToQueue(10, queue); |
| BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); |
| ::usleep(300*1000); |
| queue.purgeExpired(0); |
| BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); |
| } |
| |
| QPID_AUTO_TEST_CASE(testQueueCleaner) { |
| Timer timer; |
| QueueRegistry queues; |
| Queue::shared_ptr queue = queues.declare("my-queue").first; |
| addMessagesToQueue(10, *queue, 200, 400); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); |
| |
| QueueCleaner cleaner(queues, &timer); |
| cleaner.start(100 * qpid::sys::TIME_MSEC); |
| ::usleep(300*1000); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); |
| ::usleep(300*1000); |
| BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); |
| } |
| |
| |
| namespace { |
| // helper for group tests |
| void verifyAcquire( Queue::shared_ptr queue, |
| TestConsumer::shared_ptr c, |
| std::deque<QueuedMessage>& results, |
| const std::string& expectedGroup, |
| const int expectedId ) |
| { |
| queue->dispatch(c); |
| results.push_back(c->last); |
| std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); |
| int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); |
| BOOST_CHECK_EQUAL( group, expectedGroup ); |
| BOOST_CHECK_EQUAL( id, expectedId ); |
| } |
| } |
| |
| QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { |
| // |
| // Verify that consumers of grouped messages own the groups once a message is acquired, |
| // and release the groups once all acquired messages have been dequeued or requeued |
| // |
| FieldTable args; |
| Queue::shared_ptr queue(new Queue("my_queue", true)); |
| args.setString("qpid.group_header_key", "GROUP-ID"); |
| args.setInt("qpid.shared_msg_group", 1); |
| queue->configure(args); |
| |
| std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), |
| std::string("b"), std::string("b"), std::string("b"), |
| std::string("c"), std::string("c"), std::string("c") }; |
| for (int i = 0; i < 9; ++i) { |
| intrusive_ptr<Message> msg = create_message("e", "A"); |
| msg->insertCustomProperty("GROUP-ID", groups[i]); |
| msg->insertCustomProperty("MY-ID", i); |
| queue->deliver(msg); |
| } |
| |
| // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... |
| // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, |
| |
| BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); |
| |
| TestConsumer::shared_ptr c1(new TestConsumer("C1")); |
| TestConsumer::shared_ptr c2(new TestConsumer("C2")); |
| |
| queue->consume(c1); |
| queue->consume(c2); |
| |
| std::deque<QueuedMessage> dequeMeC1; |
| std::deque<QueuedMessage> dequeMeC2; |
| |
| |
| verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) |
| verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) |
| |
| // now let c1 complete the 'a-0' message - this should free the 'a' group |
| queue->dequeue( 0, dequeMeC1.front() ); |
| dequeMeC1.pop_front(); |
| |
| // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... |
| // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- |
| |
| // now c2 should pick up the next 'a-1', since it is oldest free |
| verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" |
| |
| // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... |
| // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- |
| |
| // c1 should only be able to snarf up the first "c" message now... |
| verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" |
| |
| // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... |
| // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 |
| |
| // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) |
| queue->dequeue( 0, dequeMeC2.front() ); |
| dequeMeC2.pop_front(); |
| |
| // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... |
| // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 |
| |
| // b group is free, c is owned by c1 - c1's next get should grab 'b-4' |
| verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); |
| |
| // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... |
| // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 |
| |
| // c2 can now only grab a-2, and that's all |
| verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); |
| |
| // now C2 can't get any more, since C1 owns "b" and "c" group... |
| bool gotOne = queue->dispatch(c2); |
| BOOST_CHECK( !gotOne ); |
| |
| // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) |
| queue->dequeue( 0, dequeMeC1.front() ); |
| dequeMeC1.pop_front(); |
| |
| // Queue = a-1, a-2, b-4, b-5, c-7, c-8... |
| // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- |
| |
| // c2 can now grab c-7 |
| verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); |
| |
| // Queue = a-1, a-2, b-4, b-5, c-7, c-8... |
| // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 |
| |
| // what happens if C-2 "requeues" a-1 and a-2? |
| queue->requeue( dequeMeC2.front() ); |
| dequeMeC2.pop_front(); |
| queue->requeue( dequeMeC2.front() ); |
| dequeMeC2.pop_front(); // now just has c-7 acquired |
| |
| // Queue = a-1, a-2, b-4, b-5, c-7, c-8... |
| // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 |
| |
| // now c1 will grab a-1 and a-2... |
| verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); |
| verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); |
| |
| // Queue = a-1, a-2, b-4, b-5, c-7, c-8... |
| // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 |
| |
| // c2 can now acquire c-8 only |
| verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); |
| |
| // and c1 can get b-5 |
| verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); |
| |
| // should be no more acquire-able for anyone now: |
| gotOne = queue->dispatch(c1); |
| BOOST_CHECK( !gotOne ); |
| gotOne = queue->dispatch(c2); |
| BOOST_CHECK( !gotOne ); |
| |
| // requeue all of C1's acquired messages, then cancel C1 |
| while (!dequeMeC1.empty()) { |
| queue->requeue(dequeMeC1.front()); |
| dequeMeC1.pop_front(); |
| } |
| queue->cancel(c1); |
| |
| // Queue = a-1, a-2, b-4, b-5, c-7, c-8... |
| // Owners= ---, ---, ---, ---, ^C2, ^C2 |
| |
| // b-4, a-1, a-2, b-5 all should be available, right? |
| verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); |
| |
| while (!dequeMeC2.empty()) { |
| queue->dequeue(0, dequeMeC2.front()); |
| dequeMeC2.pop_front(); |
| } |
| |
| // Queue = a-2, b-4, b-5 |
| // Owners= ---, ---, --- |
| |
| TestConsumer::shared_ptr c3(new TestConsumer("C3")); |
| std::deque<QueuedMessage> dequeMeC3; |
| |
| verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); |
| verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); |
| |
| // Queue = a-2, b-4, b-5 |
| // Owners= ^C3, ^C2, ^C2 |
| |
| gotOne = queue->dispatch(c3); |
| BOOST_CHECK( !gotOne ); |
| |
| verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); |
| |
| while (!dequeMeC2.empty()) { |
| queue->dequeue(0, dequeMeC2.front()); |
| dequeMeC2.pop_front(); |
| } |
| |
| // Queue = a-2, |
| // Owners= ^C3, |
| |
| intrusive_ptr<Message> msg = create_message("e", "A"); |
| msg->insertCustomProperty("GROUP-ID", "a"); |
| msg->insertCustomProperty("MY-ID", 9); |
| queue->deliver(msg); |
| |
| // Queue = a-2, a-9 |
| // Owners= ^C3, ^C3 |
| |
| gotOne = queue->dispatch(c2); |
| BOOST_CHECK( !gotOne ); |
| |
| msg = create_message("e", "A"); |
| msg->insertCustomProperty("GROUP-ID", "b"); |
| msg->insertCustomProperty("MY-ID", 10); |
| queue->deliver(msg); |
| |
| // Queue = a-2, a-9, b-10 |
| // Owners= ^C3, ^C3, ---- |
| |
| verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); |
| verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); |
| |
| gotOne = queue->dispatch(c3); |
| BOOST_CHECK( !gotOne ); |
| |
| queue->cancel(c2); |
| queue->cancel(c3); |
| } |
| |
| |
| QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { |
| // |
| // Verify that the same default group name is automatically applied to messages that |
| // do not specify a group name. |
| // |
| FieldTable args; |
| Queue::shared_ptr queue(new Queue("my_queue", true)); |
| args.setString("qpid.group_header_key", "GROUP-ID"); |
| args.setInt("qpid.shared_msg_group", 1); |
| queue->configure(args); |
| |
| for (int i = 0; i < 3; ++i) { |
| intrusive_ptr<Message> msg = create_message("e", "A"); |
| // no "GROUP-ID" header |
| msg->insertCustomProperty("MY-ID", i); |
| queue->deliver(msg); |
| } |
| |
| // Queue = 0, 1, 2 |
| |
| BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); |
| |
| TestConsumer::shared_ptr c1(new TestConsumer("C1")); |
| TestConsumer::shared_ptr c2(new TestConsumer("C2")); |
| |
| queue->consume(c1); |
| queue->consume(c2); |
| |
| std::deque<QueuedMessage> dequeMeC1; |
| std::deque<QueuedMessage> dequeMeC2; |
| |
| queue->dispatch(c1); // c1 now owns default group (acquired 0) |
| dequeMeC1.push_back(c1->last); |
| int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); |
| BOOST_CHECK_EQUAL( id, 0 ); |
| |
| bool gotOne = queue->dispatch(c2); // c2 should get nothing |
| BOOST_CHECK( !gotOne ); |
| |
| queue->dispatch(c1); // c1 now acquires 1 |
| dequeMeC1.push_back(c1->last); |
| id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); |
| BOOST_CHECK_EQUAL( id, 1 ); |
| |
| gotOne = queue->dispatch(c2); // c2 should still get nothing |
| BOOST_CHECK( !gotOne ); |
| |
| while (!dequeMeC1.empty()) { |
| queue->dequeue(0, dequeMeC1.front()); |
| dequeMeC1.pop_front(); |
| } |
| |
| // now default group should be available... |
| queue->dispatch(c2); // c2 now owns default group (acquired 2) |
| id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); |
| BOOST_CHECK_EQUAL( id, 2 ); |
| |
| gotOne = queue->dispatch(c1); // c1 should get nothing |
| BOOST_CHECK( !gotOne ); |
| |
| queue->cancel(c1); |
| queue->cancel(c2); |
| } |
| |
| QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ |
| |
| TestMessageStoreOC testStore; |
| client::QueueOptions args; |
| args.setPersistLastNode(); |
| |
| Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); |
| queue1->create(args); |
| Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); |
| queue2->create(args); |
| |
| intrusive_ptr<Message> msg1 = create_message("e", "A"); |
| |
| queue1->deliver(msg1); |
| queue2->deliver(msg1); |
| |
| //change mode |
| queue1->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); |
| queue2->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); |
| |
| // check they don't get stored twice |
| queue1->setLastNodeFailure(); |
| queue2->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); |
| |
| intrusive_ptr<Message> msg2 = create_message("e", "B"); |
| queue1->deliver(msg2); |
| queue2->deliver(msg2); |
| |
| queue1->clearLastNodeFailure(); |
| queue2->clearLastNodeFailure(); |
| // check only new messages get forced |
| queue1->setLastNodeFailure(); |
| queue2->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); |
| |
| // check no failure messages are stored |
| queue1->clearLastNodeFailure(); |
| queue2->clearLastNodeFailure(); |
| |
| intrusive_ptr<Message> msg3 = create_message("e", "B"); |
| queue1->deliver(msg3); |
| queue2->deliver(msg3); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); |
| queue1->setLastNodeFailure(); |
| queue2->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 6u); |
| |
| // check requeue 1 |
| intrusive_ptr<Message> msg4 = create_message("e", "C"); |
| intrusive_ptr<Message> msg5 = create_message("e", "D"); |
| |
| framing::SequenceNumber sequence(1); |
| QueuedMessage qmsg1(queue1.get(), msg4, sequence); |
| QueuedMessage qmsg2(queue2.get(), msg5, ++sequence); |
| |
| queue1->requeue(qmsg1); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); |
| |
| // check requeue 2 |
| queue2->clearLastNodeFailure(); |
| queue2->requeue(qmsg2); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); |
| queue2->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); |
| |
| queue2->clearLastNodeFailure(); |
| queue2->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); |
| } |
| |
| QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){ |
| /* |
| simulate this: |
| 1. start two nodes |
| 2. create cluster durable queue and add some messages |
| 3. kill one node (trigger force-persistent behaviour) |
| 4. stop and recover remaining node |
| 5. add another node |
| 6. kill that new node again |
| make sure that an attempt to re-enqueue a message does not happen which will |
| result in the last man standing exiting with an error. |
| |
| we need to make sure that recover is safe, i.e. messages are |
| not requeued to the store. |
| */ |
| TestMessageStoreOC testStore; |
| client::QueueOptions args; |
| // set queue mode |
| args.setPersistLastNode(); |
| |
| Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); |
| intrusive_ptr<Message> received; |
| queue1->create(args); |
| |
| // check requeue 1 |
| intrusive_ptr<Message> msg1 = create_message("e", "C"); |
| intrusive_ptr<Message> msg2 = create_message("e", "D"); |
| |
| queue1->recover(msg1); |
| |
| queue1->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); |
| |
| queue1->clearLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); |
| |
| queue1->deliver(msg2); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); |
| queue1->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); |
| |
| } |
| |
| QPID_AUTO_TEST_CASE(testLastNodeJournalError){ |
| /* |
| simulate store exception going into last node standing |
| |
| */ |
| TestMessageStoreOC testStore; |
| client::QueueOptions args; |
| // set queue mode |
| args.setPersistLastNode(); |
| |
| Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); |
| intrusive_ptr<Message> received; |
| queue1->configure(args); |
| |
| // check requeue 1 |
| intrusive_ptr<Message> msg1 = create_message("e", "C"); |
| |
| queue1->deliver(msg1); |
| testStore.createError(); |
| |
| ScopedSuppressLogging sl; // Suppress messages for expected errors. |
| queue1->setLastNodeFailure(); |
| BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); |
| |
| } |
| |
| intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false) |
| { |
| intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable); |
| if (content.size()) MessageUtils::addContent(msg, content); |
| msg->setStore(&store); |
| return msg; |
| } |
| |
| QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ |
| |
| TestMessageStoreOC testStore; |
| client::QueueOptions args0; // No size policy |
| client::QueueOptions args1; |
| args1.setSizePolicy(FLOW_TO_DISK, 0, 1); |
| client::QueueOptions args2; |
| args2.setSizePolicy(FLOW_TO_DISK, 0, 2); |
| |
| // --- Fanout exchange bound to single transient queue ------------------------------------------------------------- |
| |
| FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue |
| Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit |
| tq1->configure(args1); |
| sbtFanout1.bind(tq1, "", 0); |
| |
| intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg01(msg01); |
| sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit |
| msg01->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg01->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); |
| |
| intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg02(msg02); |
| { |
| ScopedSuppressLogging sl; // suppress expected error messages. |
| BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); |
| } |
| msg02->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); |
| |
| intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content |
| DeliverableMessage dmsg03(msg03); |
| { |
| ScopedSuppressLogging sl; // suppress expected error messages. |
| BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); |
| } |
| msg03->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); |
| |
| intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content |
| DeliverableMessage dmsg04(msg04); |
| { |
| ScopedSuppressLogging sl; // suppress expected error messages. |
| BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); |
| } |
| msg04->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); |
| |
| intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content |
| DeliverableMessage dmsg05(msg05); |
| { |
| ScopedSuppressLogging sl; // suppress expected error messages. |
| BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); |
| } |
| msg05->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); |
| |
| // --- Fanout exchange bound to single durable queue --------------------------------------------------------------- |
| |
| FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue |
| Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit |
| dq2->configure(args1); |
| sbdFanout2.bind(dq2, "", 0); |
| |
| intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg06(msg06); |
| sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit |
| msg06->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg06->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, dq2->getMessageCount()); |
| |
| intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg07(msg07); |
| sbdFanout2.route(dmsg07, "", 0); |
| msg07->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg07->isContentReleased(), true); |
| BOOST_CHECK_EQUAL(2u, dq2->getMessageCount()); |
| |
| intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content |
| DeliverableMessage dmsg08(msg08); |
| sbdFanout2.route(dmsg08, "", 0); |
| msg08->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg08->isContentReleased(), true); |
| BOOST_CHECK_EQUAL(3u, dq2->getMessageCount()); |
| |
| intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content |
| DeliverableMessage dmsg09(msg09); |
| sbdFanout2.route(dmsg09, "", 0); |
| msg09->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg09->isContentReleased(), true); |
| BOOST_CHECK_EQUAL(4u, dq2->getMessageCount()); |
| |
| intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content |
| DeliverableMessage dmsg10(msg10); |
| sbdFanout2.route(dmsg10, "", 0); |
| msg10->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg10->isContentReleased(), true); |
| BOOST_CHECK_EQUAL(5u, dq2->getMessageCount()); |
| |
| // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------ |
| |
| FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues |
| Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2 |
| dq3->configure(args2); |
| mbdFanout3.bind(dq3, "", 0); |
| Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1 |
| dq4->configure(args1); |
| mbdFanout3.bind(dq4, "", 0); |
| Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit |
| dq5->configure(args0); |
| mbdFanout3.bind(dq5, "", 0); |
| |
| intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg11(msg11); |
| mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit |
| msg11->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg11->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(1u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(1u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg12(msg12); |
| mbdFanout3.route(dmsg12, "", 0); |
| msg12->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! |
| BOOST_CHECK_EQUAL(2u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(2u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(2u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content |
| DeliverableMessage dmsg13(msg13); |
| mbdFanout3.route(dmsg13, "", 0); |
| msg13->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg13->isContentReleased(), true); |
| BOOST_CHECK_EQUAL(3u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(3u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(3u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content |
| DeliverableMessage dmsg14(msg14); |
| mbdFanout3.route(dmsg14, "", 0); |
| msg14->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! |
| BOOST_CHECK_EQUAL(4u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(4u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(4u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content |
| DeliverableMessage dmsg15(msg15); |
| mbdFanout3.route(dmsg15, "", 0); |
| msg15->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg15->isContentReleased(), true); |
| BOOST_CHECK_EQUAL(5u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(5u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(5u, dq5->getMessageCount()); |
| |
| // Bind a transient queue, this should block the release of any further messages. |
| // Note: this will result in a violation of the count policy of dq3 and dq4 - but this |
| // is expected until a better overall multi-queue design is implemented. Similarly |
| // for the other tests in this section. |
| |
| Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit |
| tq6->configure(args0); |
| mbdFanout3.bind(tq6, "", 0); |
| |
| intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg16(msg16); |
| mbdFanout3.route(dmsg16, "", 0); |
| msg16->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg16->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(6u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(6u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(6u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content |
| DeliverableMessage dmsg17(msg17); |
| mbdFanout3.route(dmsg17, "", 0); |
| msg17->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg17->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(7u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(7u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(7u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content |
| DeliverableMessage dmsg18(msg18); |
| mbdFanout3.route(dmsg18, "", 0); |
| msg18->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg18->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(8u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(8u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(8u, dq5->getMessageCount()); |
| |
| intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content |
| DeliverableMessage dmsg19(msg19); |
| mbdFanout3.route(dmsg19, "", 0); |
| msg19->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg19->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(9u, dq3->getMessageCount()); |
| BOOST_CHECK_EQUAL(9u, dq4->getMessageCount()); |
| BOOST_CHECK_EQUAL(9u, dq5->getMessageCount()); |
| |
| |
| // --- Fanout exchange bound to multiple durable and transient queues ---------------------------------------------- |
| |
| FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues |
| Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit |
| dq7->configure(args0); |
| mbmFanout4.bind(dq7, "", 0); |
| Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit |
| dq8->configure(args1); |
| mbmFanout4.bind(dq8, "", 0); |
| Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit |
| tq9->configure(args0); |
| mbmFanout4.bind(tq9, "", 0); |
| |
| intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg20(msg20); |
| mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit |
| msg20->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg20->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(1u, dq7->getMessageCount()); |
| BOOST_CHECK_EQUAL(1u, dq8->getMessageCount()); |
| BOOST_CHECK_EQUAL(1u, tq9->getMessageCount()); |
| |
| intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content |
| DeliverableMessage dmsg21(msg21); |
| mbmFanout4.route(dmsg21, "", 0); |
| msg21->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg21->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(2u, dq8->getMessageCount()); |
| BOOST_CHECK_EQUAL(2u, tq9->getMessageCount()); |
| |
| intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content |
| DeliverableMessage dmsg22(msg22); |
| mbmFanout4.route(dmsg22, "", 0); |
| msg22->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg22->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(3u, tq9->getMessageCount()); |
| |
| intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content |
| DeliverableMessage dmsg23(msg23); |
| mbmFanout4.route(dmsg23, "", 0); |
| msg23->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg23->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(4u, tq9->getMessageCount()); |
| |
| intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content |
| DeliverableMessage dmsg24(msg24); |
| mbmFanout4.route(dmsg24, "", 0); |
| msg24->tryReleaseContent(); |
| BOOST_CHECK_EQUAL(msg24->isContentReleased(), false); |
| BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit |
| BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); |
| } |
| |
| |
| QPID_AUTO_TEST_SUITE_END() |
| |
| }} // namespace qpid::tests |