blob: f4a38ae8610a639c17792553296399478cdffc9f [file] [log] [blame]
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test_tools.h"
#include "unit_test.h"
#include "ForkedBroker.h"
#include "BrokerFixture.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
#include <iostream>
#include <iterator>
#include <vector>
#include <set>
#include <algorithm>
#include <iterator>
namespace std { // ostream operators in std:: namespace
template <class T>
ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); }
}
QPID_AUTO_TEST_SUITE(cluster)
using namespace std;
using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::shared_ptr;
using qpid::cluster::Cluster;
/** Parse broker & cluster options */
Broker::Options parseOpts(size_t argc, const char* argv[]) {
Broker::Options opts;
Plugin::addOptions(opts); // Pick up cluster options.
opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
return opts;
}
/** Cluster fixture is a vector of ports for the replicas.
*
* At most one replica (by default replica 0) is in the current
* process, all others are forked as children.
*/
class ClusterFixture : public vector<uint16_t> {
string name;
std::auto_ptr<BrokerFixture> localBroker;
int localIndex;
std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
public:
/** @param localIndex can be -1 meaning don't automatically start a local broker.
* A local broker can be started with addLocal().
*/
ClusterFixture(size_t n, int localIndex=0);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
void addLocal(); // Add a local broker.
void setup();
bool hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
/** Kill a forked broker with sig, or shutdown localBroker if n==0. */
void kill(size_t n, int sig=SIGINT) {
if (n == size_t(localIndex))
localBroker->broker->shutdown();
else
forkedBrokers[n]->kill(sig);
}
/** Kill a broker and suppress errors from connection. */
void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT) {
ScopedSuppressLogging sl;
kill(n,sig);
try { c.close(); } catch(...) {}
}
};
ClusterFixture::ClusterFixture(size_t n, int localIndex_) : name(Uuid(true).str()), localIndex(localIndex_) {
add(n);
}
void ClusterFixture::add() {
if (size() != size_t(localIndex)) { // fork a broker process.
std::ostringstream os; os << "fork" << size();
std::string prefix = os.str();
const char* argv[] = {
"qpidd " __FILE__ ,
"--no-module-dir",
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
"--log-prefix", prefix.c_str(),
};
size_t argc = sizeof(argv)/sizeof(argv[0]);
forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(argc, argv)));
push_back(forkedBrokers.back()->getPort());
}
else { // Run in this process
addLocal();
}
}
void ClusterFixture::addLocal() {
assert(int(size()) == localIndex || localIndex == -1);
localIndex = size();
const char* argv[] = {
"qpidd " __FILE__ ,
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir"
};
size_t argc = sizeof(argv)/sizeof(argv[0]);
ostringstream os; os << "local" << localIndex;
qpid::log::Logger::instance().setPrefix(os.str());
localBroker.reset(new BrokerFixture(parseOpts(argc, argv)));
push_back(localBroker->getPort());
forkedBrokers.push_back(shared_ptr<ForkedBroker>());
}
ostream& operator<<(ostream& o, const cpg_name* n) {
return o << qpid::cluster::Cpg::str(*n);
}
ostream& operator<<(ostream& o, const cpg_address& a) {
return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
}
template <class T>
ostream& operator<<(ostream& o, const pair<T*, int>& array) {
o << "{ ";
ostream_iterator<cpg_address> i(o, " ");
copy(array.first, array.first+array.second, i);
o << "}";
return o;
}
template <class C> set<uint16_t> makeSet(const C& c) {
set<uint16_t> s;
std::copy(c.begin(), c.end(), std::inserter(s, s.begin()));
return s;
}
template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
vector<Url> urls = source.getKnownBrokers();
if (n >= 0 && unsigned(n) != urls.size()) {
BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
// Retry up to 10 secs in .1 second intervals.
for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
::usleep(1000*100); // 0.1 secs
urls = source.getKnownBrokers();
}
}
BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
set<uint16_t> s;
for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
s.insert((*i)[0].get<TcpAddress>()->port);
return s;
}
class Sender {
public:
Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
AMQFrame f(body);
f.setChannel(channel);
f.setFirstSegment(firstSeg);
f.setLastSegment(lastSeg);
f.setFirstFrame(firstFrame);
f.setLastFrame(lastFrame);
connection->handle(f);
}
private:
boost::shared_ptr<ConnectionImpl> connection;
uint16_t channel;
};
int64_t getMsgSequence(const Message& m) {
return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
}
QPID_AUTO_TEST_CASE(testSequenceOptions) {
// Make sure the exchange qpid.msg_sequence property is properly replicated.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
FieldTable args;
args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
c0.session.queueDeclare(arg::queue="q");
c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
cluster.add();
Client c1(cluster[1]);
c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
}
QPID_AUTO_TEST_CASE(testUnsupported) {
ScopedSuppressLogging sl;
ClusterFixture cluster(1);
Client c1(cluster[0], "c1");
BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);
Client c2(cluster[0], "c2");
Message m;
m.getDeliveryProperties().setTtl(1);
BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
}
QPID_AUTO_TEST_CASE(testTxTransaction) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare(arg::queue="q");
c0.session.messageTransfer(arg::content=Message("A", "q"));
c0.session.messageTransfer(arg::content=Message("B", "q"));
// Start a transaction that will commit.
Session commitSession = c0.connection.newSession("commit");
SubscriptionManager commitSubs(commitSession);
commitSession.txSelect();
commitSession.messageTransfer(arg::content=Message("a", "q"));
commitSession.messageTransfer(arg::content=Message("b", "q"));
BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
rollbackSession.messageTransfer(arg::content=Message("1", "q"));
Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
// Add new member mid transaction.
cluster.add();
Client c1(cluster[1], "c1");
// More transactional work
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
rollbackSession.messageTransfer(arg::content=Message("2", "q"));
commitSession.messageTransfer(arg::content=Message("c", "q"));
rollbackSession.messageTransfer(arg::content=Message("3", "q"));
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
// Commit/roll back.
commitSession.txCommit();
rollbackSession.txRollback();
rollbackSession.messageRelease(rollbackMessage.getId());
// Verify queue status: just the comitted messages and dequeues should remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
}
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
Message m;
// Create unacked message: acquired but not accepted.
SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
c0.session.queueDeclare("q1");
c0.session.messageTransfer(arg::content=Message("11","q1"));
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
// Create unacked message: not acquired, accepted or completeed.
SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
c0.session.queueDeclare("q2");
c0.session.messageTransfer(arg::content=Message("21","q2"));
c0.session.messageTransfer(arg::content=Message("22","q2"));
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(m.getData(), "21");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
c0.subs.getSubscription("q2").acquire(m); // Acquire manually
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
// Create empty credit record: acquire and accept but don't complete.
SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
c0.session.queueDeclare("q3");
c0.session.messageTransfer(arg::content=Message("31", "q3"));
c0.session.messageTransfer(arg::content=Message("32", "q3"));
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
Message m31=q3.get(TIME_SEC);
BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
// Add new member while there are unacked messages.
cluster.add();
Client c1(cluster[1], "c1");
// Check queue counts
BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
// Complete the empty credit message, should unblock the message behind it.
BOOST_CHECK_THROW(q3.get(0), Exception);
c0.session.markCompleted(SequenceSet(m31.getId()), true);
BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
// Close the original session - unacked messages should be requeued.
c0.session.close();
BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11");
BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21");
BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
}
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
// Verify that we dump transaction state correctly to new members.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
// Do work in a transaction.
c0.session.txSelect();
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("1","q"));
c0.session.messageTransfer(arg::content=Message("2","q"));
Message m;
BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "1");
// New member, TX not comitted, c1 should see nothing.
cluster.add();
Client c1(cluster[1], "c1");
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
// After commit c1 shoudl see results of tx.
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
c0.session.messageTransfer(arg::content=Message("3","q"));
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "3");
}
QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
// Verify that we dump a partially recieved message to a new member.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
// Send first 2 frames of message.
MessageTransferBody transfer(
ProtocolVersion(), std::string(), // default exchange.
framing::message::ACCEPT_MODE_NONE,
framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
sender.send(transfer, true, false, true, true);
AMQHeaderBody header;
header.get<DeliveryProperties>(true)->setRoutingKey("q");
sender.send(header, false, false, true, true);
// No reliable way to ensure the partial message has arrived
// before we start the new broker, so we sleep.
::usleep(2500);
cluster.add();
// Send final 2 frames of message.
sender.send(AMQContentBody("ab"), false, true, true, false);
sender.send(AMQContentBody("cd"), false, true, false, true);
// Verify message is enqued correctly on second member.
Message m;
Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "abcd");
BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
}
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
cluster.add();
Client c1(cluster[1], "c1");
set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
kb0 = knownBrokerPorts(c0.connection, 2);
BOOST_CHECK_EQUAL(kb1.size(), 2u);
BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
BOOST_CHECK_EQUAL(kb1,kb0);
cluster.add();
Client c2(cluster[2], "c2");
set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
kb1 = knownBrokerPorts(c1.connection, 3);
kb0 = knownBrokerPorts(c0.connection, 3);
BOOST_CHECK_EQUAL(kb2.size(), 3u);
BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
BOOST_CHECK_EQUAL(kb2,kb0);
BOOST_CHECK_EQUAL(kb2,kb1);
cluster.killWithSilencer(1,c1.connection,9);
kb0 = knownBrokerPorts(c0.connection, 2);
kb2 = knownBrokerPorts(c2.connection, 2);
BOOST_CHECK_EQUAL(kb0.size(), 2u);
BOOST_CHECK_EQUAL(kb0, kb2);
}
QPID_AUTO_TEST_CASE(DumpConsumers) {
ClusterFixture cluster(1, 1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare("p");
c0.session.queueDeclare("q");
c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
LocalQueue lp;
c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
c0.session.sync();
// Start new members
cluster.add(); // Local
Client c1(cluster[1], "c1");
cluster.add();
Client c2(cluster[2], "c2");
// Transfer messages
c0.session.messageTransfer(arg::content=Message("aaa", "q"));
c0.session.messageTransfer(arg::content=Message("bbb", "p"));
c0.session.messageTransfer(arg::content=Message("ccc", "p"));
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
Message m;
BOOST_CHECK(c0.lq.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
// Check second subscription's flow control: gets first message, not second.
BOOST_CHECK(lp.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "bbb");
BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "ccc");
// Kill the subscribing member, ensure further messages are not removed.
cluster.killWithSilencer(0,c0.connection,9);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
c1.session.messageTransfer(arg::content=Message("xxx", "q"));
BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
// Create some shared state.
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo","q"));
c0.session.messageTransfer(arg::content=Message("bar","q"));
while (c0.session.queueQuery("q").getMessageCount() != 2)
::usleep(1000); // Wait for message to show up on broker 0.
// Add a new broker, it should catch up.
cluster.add();
// Do some work post-add
c0.session.queueDeclare("p");
c0.session.messageTransfer(arg::content=Message("pfoo","p"));
// Do some work post-join
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
c0.session.messageTransfer(arg::content=Message("pbar","p"));
// Verify new brokers have state.
Message m;
Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "foo");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
// Add another broker, don't wait for join - should be stalled till ready.
cluster.add();
Client c2(cluster[2], "c2");
BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "pfoo");
BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "pbar");
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
ClusterFixture cluster(3);
Client c0(cluster[0]);
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
c0.session.queueDeclare("q");
c0.session.exchangeDeclare("ex", arg::type="direct");
c0.session.close();
c0.connection.close();
// Verify all brokers get wiring update.
for (size_t i = 0; i < cluster.size(); ++i) {
BOOST_MESSAGE("i == "<< i);
Client c(cluster[i]);
BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
}
}
QPID_AUTO_TEST_CASE(testMessageEnqueue) {
// Enqueue on one broker, dequeue on another.
ClusterFixture cluster(2);
Client c0(cluster[0]);
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo", "q"));
c0.session.messageTransfer(arg::content=Message("bar", "q"));
c0.session.close();
Client c1(cluster[1]);
Message msg;
BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(string("foo"), msg.getData());
BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
ClusterFixture cluster(3);
Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo", "q"));
c0.session.messageTransfer(arg::content=Message("bar", "q"));
Message msg;
// Dequeue on 2 others, ensure correct order.
Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("foo", msg.getData());
Client c2(cluster[2], "c2");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("bar", msg.getData());
// Queue should be empty on all cluster members.
BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
ClusterFixture cluster(3);
Client c0(cluster[0]);
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
// First start a subscription.
c0.session.queueDeclare("q");
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
c1.session.messageTransfer(arg::content=Message("foo", "q"));
c1.session.messageTransfer(arg::content=Message("bar", "q"));
// Check they arrived
Message m;
BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
BOOST_CHECK_EQUAL("foo", m.getData());
BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
BOOST_CHECK_EQUAL("bar", m.getData());
// Queue should be empty on all cluster members.
Client c2(cluster[2]);
BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
QPID_AUTO_TEST_SUITE_END()