blob: 70d60b10b45c2ce9dfff0ae6313de3bdfdd5fa0a [file] [log] [blame]
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test_tools.h"
#include "unit_test.h"
#include "ForkedBroker.h"
#include "BrokerFixture.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/UpdateClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/assign.hpp>
#include <string>
#include <iostream>
#include <iterator>
#include <vector>
#include <set>
#include <algorithm>
#include <iterator>
using namespace std;
using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
using qpid::sys::TIME_SEC;
using qpid::broker::Broker;
using boost::shared_ptr;
using qpid::cluster::Cluster;
using boost::assign::list_of;
#include "ClusterFixture.h"
ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_, const string& clusterLib_)
: name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_), clusterLib(clusterLib_)
{
add(n);
}
ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_, const string& clusterLib_)
: name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_), clusterLib(clusterLib_)
{
add(n);
}
const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
list_of<string>("--auth=no")("--no-data-dir");
ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
Args args = list_of<string>("qpidd ")
("--no-module-dir")
("--load-module")(clusterLib)
("--cluster-name")(name)
("--log-prefix")(prefix);
args.insert(args.end(), userArgs.begin(), userArgs.end());
if (updateArgs) updateArgs(args, index);
return args;
}
void ClusterFixture::add() {
if (size() != size_t(localIndex)) { // fork a broker process.
std::ostringstream os; os << "fork" << size();
std::string prefix = os.str();
forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size()))));
push_back(forkedBrokers.back()->getPort());
}
else { // Run in this process
addLocal();
}
}
namespace {
/** Parse broker & cluster options */
Broker::Options parseOpts(size_t argc, const char* argv[]) {
Broker::Options opts;
Plugin::addOptions(opts); // Pick up cluster options.
opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
return opts;
}
}
void ClusterFixture::addLocal() {
assert(int(size()) == localIndex);
ostringstream os; os << "local" << localIndex;
string prefix = os.str();
Args args(makeArgs(prefix, localIndex));
vector<const char*> argv(args.size());
transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
qpid::log::Logger::instance().setPrefix(prefix);
localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0])));
push_back(localBroker->getPort());
forkedBrokers.push_back(shared_ptr<ForkedBroker>());
}
bool ClusterFixture::hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); }
/** Kill a forked broker with sig, or shutdown localBroker if n==0. */
void ClusterFixture::kill(size_t n, int sig) {
if (n == size_t(localIndex))
localBroker->broker->shutdown();
else
forkedBrokers[n]->kill(sig);
}
/** Kill a broker and suppressing errors from closing connection c. */
void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) {
ScopedSuppressLogging sl;
kill(n,sig);
try { c.close(); } catch(...) {}
}
/**
* Get the known broker ports from a Connection.
*@param n if specified wait for the cluster size to be n, up to a timeout.
*/
std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
std::vector<qpid::Url> urls = source.getKnownBrokers();
if (n >= 0 && unsigned(n) != urls.size()) {
// Retry up to 10 secs in .1 second intervals.
for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
qpid::sys::usleep(1000*100); // 0.1 secs
urls = source.getKnownBrokers();
}
}
std::set<int> s;
for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
s.insert((*i)[0].get<qpid::TcpAddress>()->port);
return s;
}